Source code for deepcausalmmm.postprocess.optimization_utils

"""
Utility functions for budget optimization with DeepCausalMMM.

This module provides helper functions to prepare data from DeepCausalMMM
model outputs for budget optimization, including data formatting,
curve parameter extraction, and integration with ResponseCurveFit.
"""

from typing import Dict, List, Optional, Tuple
import logging

import pandas as pd
import numpy as np

from .response_curves import ResponseCurveFit
from .optimization import BudgetOptimizer, OptimizationResult

logger = logging.getLogger('deepcausalmmm')


[docs] def prepare_optimization_data( contributions_df: pd.DataFrame, media_data: pd.DataFrame, *, date_col: str = 'week_monday', channel_col: str = 'channel', contribution_col: str = 'predicted', spend_col: str = 'spend', impressions_col: str = 'impressions' ) -> pd.DataFrame: """ Prepare data from DeepCausalMMM outputs for response curve fitting and optimization. This function merges model contribution predictions with media spend/impression data to create the required format for ResponseCurveFit. Parameters ---------- contributions_df : pd.DataFrame Model contributions output with columns: date, channel, predicted media_data : pd.DataFrame Media data with columns: date, channel, spend, impressions date_col : str, default='week_monday' Name of the date column channel_col : str, default='channel' Name of the channel column contribution_col : str, default='predicted' Name of the contribution/prediction column spend_col : str, default='spend' Name of the spend column impressions_col : str, default='impressions' Name of the impressions column Returns ------- pd.DataFrame Merged data ready for ResponseCurveFit with columns: week_monday, channel, spend, impressions, predicted Examples -------- >>> # After training DeepCausalMMM model >>> contributions = model.get_contributions() # Your model output >>> media_df = pd.read_csv('media_data.csv') >>> >>> optimization_data = prepare_optimization_data( ... contributions_df=contributions, ... media_data=media_df ... ) """ # Ensure date column is datetime contributions_df = contributions_df.copy() media_data = media_data.copy() contributions_df[date_col] = pd.to_datetime(contributions_df[date_col]) media_data[date_col] = pd.to_datetime(media_data[date_col]) # Merge contributions with media data merged = pd.merge( contributions_df[[date_col, channel_col, contribution_col]], media_data[[date_col, channel_col, spend_col, impressions_col]], on=[date_col, channel_col], how='inner' ) # Rename to standard column names for ResponseCurveFit merged = merged.rename(columns={ date_col: 'week_monday', channel_col: 'channel', contribution_col: 'predicted', spend_col: 'spend', impressions_col: 'impressions' }) # Remove rows with zero or negative spend/impressions (can't fit curves) merged = merged[ (merged['spend'] > 0) & (merged['impressions'] > 0) & (merged['predicted'] >= 0) ] logger.info( f"Prepared optimization data: {len(merged)} rows, " f"{merged['channel'].nunique()} channels" ) return merged
[docs] def fit_response_curves_batch( data: pd.DataFrame, channels: Optional[List[str]] = None, *, bottom_param: bool = False, model_level: str = 'Overall', date_col: str = 'week_monday', generate_figures: bool = False, save_figures: bool = False, output_dir: Optional[str] = None ) -> Tuple[Dict[str, Dict], pd.DataFrame]: """ Fit response curves for multiple channels in batch. This is a convenience wrapper around ResponseCurveFit that processes multiple channels and returns both dictionary and DataFrame formats. Parameters ---------- data : pd.DataFrame Data prepared by prepare_optimization_data() with columns: week_monday, channel, spend, impressions, predicted channels : List[str], optional List of channels to fit. If None, fits all channels in data bottom_param : bool, default=False Whether to fit non-zero intercept model_level : str, default='Overall' Aggregation level: 'Overall' or 'DMA' date_col : str, default='week_monday' Name of date column generate_figures : bool, default=False Whether to generate plots save_figures : bool, default=False Whether to save plots to files output_dir : str, optional Directory to save plots (required if save_figures=True) Returns ------- curves_dict : Dict[str, Dict] Response curve parameters by channel curves_df : pd.DataFrame Response curve parameters as DataFrame Examples -------- >>> # After preparing data >>> curves_dict, curves_df = fit_response_curves_batch( ... data=optimization_data, ... channels=['TV', 'Search', 'Social'], ... generate_figures=True, ... save_figures=True, ... output_dir='./response_curves/' ... ) >>> print(curves_df) """ if channels is None: channels = data['channel'].unique().tolist() if save_figures and not output_dir: raise ValueError("output_dir required when save_figures=True") logger.info(f"Fitting response curves for {len(channels)} channels...") curves_dict = {} curves_list = [] for channel in channels: try: channel_data = data[data['channel'] == channel].copy() if len(channel_data) < 10: logger.warning( f"Channel '{channel}' has insufficient data ({len(channel_data)} rows), skipping" ) continue # Create ResponseCurveFit instance fitter = ResponseCurveFit( data=channel_data, bottom_param=bottom_param, model_level=model_level, date_col=date_col ) # Fit the curve output_path = None if save_figures and output_dir: import os os.makedirs(output_dir, exist_ok=True) output_path = f"{output_dir}/{channel}_response_curve.html" fitter.fit( title=f"{channel} Response Curve", x_label="Spend ($)", y_label="Predicted Response", generate_figure=generate_figures, save_figure=save_figures, output_path=output_path, print_r_sqr=True ) # Store results if fitting succeeded if fitter.fit_flag: curves_dict[channel] = { 'top': fitter.top, 'bottom': fitter.bottom, 'saturation': fitter.saturation, 'slope': fitter.slope, 'r_2': fitter.r_2 } curves_list.append({ 'channel': channel, 'top': fitter.top, 'bottom': fitter.bottom, 'saturation': fitter.saturation, 'slope': fitter.slope, 'r_2': fitter.r_2 }) logger.info( f" {channel}: Slope={fitter.slope:.3f}, " f"Saturation=${fitter.saturation:,.0f}, R²={fitter.r_2:.3f}" ) else: logger.warning(f" {channel}: Fitting failed") except Exception as e: logger.error(f" {channel}: Error - {e}") continue curves_df = pd.DataFrame(curves_list) logger.info(f"Successfully fitted {len(curves_dict)}/{len(channels)} curves") return curves_dict, curves_df
[docs] def create_optimizer_from_model_output( contributions_df: pd.DataFrame, media_data: pd.DataFrame, budget: float, *, channels: Optional[List[str]] = None, num_weeks: int = 52, constraints: Optional[Dict[str, Dict[str, float]]] = None, method: str = 'trust-constr', generate_figures: bool = False, save_figures: bool = False, output_dir: Optional[str] = None ) -> Tuple[BudgetOptimizer, pd.DataFrame]: """ End-to-end: Create optimizer from DeepCausalMMM model outputs. This function handles the complete workflow: 1. Prepare data from model outputs 2. Fit response curves for all channels 3. Create and configure BudgetOptimizer Parameters ---------- contributions_df : pd.DataFrame Model contribution predictions media_data : pd.DataFrame Media spend and impression data budget : float Total budget to optimize channels : List[str], optional Channels to include. If None, uses all channels num_weeks : int, default=52 Planning horizon in weeks constraints : Dict[str, Dict[str, float]], optional Channel spend constraints method : str, default='trust-constr' Optimization method generate_figures : bool, default=False Whether to generate response curve plots save_figures : bool, default=False Whether to save plots output_dir : str, optional Directory for plots Returns ------- optimizer : BudgetOptimizer Configured optimizer ready to run curves_df : pd.DataFrame Response curve parameters Examples -------- >>> # Complete workflow from model outputs to optimizer >>> optimizer, curves = create_optimizer_from_model_output( ... contributions_df=model_contributions, ... media_data=media_df, ... budget=1000000, ... constraints={'TV': {'lower': 100000, 'upper': 600000}}, ... generate_figures=True, ... save_figures=True, ... output_dir='./optimization_results/' ... ) >>> >>> # Run optimization >>> result = optimizer.optimize() >>> print(result.allocation) """ # Step 1: Prepare data logger.info("Step 1: Preparing optimization data...") opt_data = prepare_optimization_data(contributions_df, media_data) # Step 2: Fit response curves logger.info("Step 2: Fitting response curves...") curves_dict, curves_df = fit_response_curves_batch( data=opt_data, channels=channels, generate_figures=generate_figures, save_figures=save_figures, output_dir=output_dir ) if not curves_dict: raise ValueError("No response curves successfully fitted") # Step 3: Create optimizer logger.info("Step 3: Creating optimizer...") fitted_channels = list(curves_dict.keys()) optimizer = BudgetOptimizer( budget=budget, channels=fitted_channels, response_curves=curves_dict, num_weeks=num_weeks, method=method ) # Set constraints if provided if constraints: # Filter constraints to only include channels we have curves for valid_constraints = { ch: c for ch, c in constraints.items() if ch in fitted_channels } if valid_constraints: optimizer.set_constraints(valid_constraints) logger.info(f"Optimizer ready: {len(fitted_channels)} channels, ${budget:,.0f} budget") return optimizer, curves_df
[docs] def compare_current_vs_optimal( current_allocation: Dict[str, float], optimal_result: OptimizationResult, *, metric_name: str = "Response" ) -> pd.DataFrame: """ Compare current budget allocation vs optimized allocation. Parameters ---------- current_allocation : Dict[str, float] Current spend by channel optimal_result : OptimizationResult Result from optimizer.optimize() metric_name : str, default='Response' Name of the metric being optimized Returns ------- pd.DataFrame Comparison table with current, optimal, and deltas Examples -------- >>> current = {'TV': 400000, 'Search': 350000, 'Social': 250000} >>> result = optimizer.optimize() >>> >>> comparison = compare_current_vs_optimal(current, result) >>> print(comparison) """ # Get optimal allocation optimal_allocation = optimal_result.allocation # Ensure same channels channels = sorted(set(current_allocation.keys()) | set(optimal_allocation.keys())) comparison = [] for channel in channels: current_spend = current_allocation.get(channel, 0) optimal_spend = optimal_allocation.get(channel, 0) # Get response from optimal result channel_row = optimal_result.by_channel[ optimal_result.by_channel['channel'] == channel ] if len(channel_row) > 0: optimal_response = channel_row['total_response'].iloc[0] optimal_roi = channel_row['roi'].iloc[0] else: optimal_response = 0 optimal_roi = 0 comparison.append({ 'channel': channel, 'current_spend': current_spend, 'optimal_spend': optimal_spend, 'spend_delta': optimal_spend - current_spend, 'spend_delta_pct': ((optimal_spend - current_spend) / current_spend * 100) if current_spend > 0 else 0, f'optimal_{metric_name.lower()}': optimal_response, 'optimal_roi': optimal_roi }) df = pd.DataFrame(comparison) df = df.sort_values('optimal_spend', ascending=False).reset_index(drop=True) return df
[docs] def generate_optimization_report( result: OptimizationResult, curves_df: pd.DataFrame, current_allocation: Optional[Dict[str, float]] = None, *, output_path: Optional[str] = None ) -> str: """ Generate a comprehensive text report of optimization results. Parameters ---------- result : OptimizationResult Optimization result curves_df : pd.DataFrame Response curve parameters current_allocation : Dict[str, float], optional Current allocation for comparison output_path : str, optional Path to save report (if not provided, returns as string) Returns ------- str Formatted report text Examples -------- >>> report = generate_optimization_report( ... result=result, ... curves_df=curves, ... current_allocation={'TV': 400000, 'Search': 350000, 'Social': 250000}, ... output_path='optimization_report.txt' ... ) >>> print(report) """ lines = [] lines.append("=" * 80) lines.append("BUDGET OPTIMIZATION REPORT") lines.append("=" * 80) lines.append("") # Summary lines.append("SUMMARY") lines.append("-" * 80) lines.append(f"Status: {'SUCCESS' if result.success else 'FAILED'}") lines.append(f"Method: {result.method}") lines.append(f"Total Budget: ${sum(result.allocation.values()):,.0f}") lines.append(f"Predicted Response: {result.predicted_response:,.0f}") lines.append(f"Overall ROI: {result.predicted_response / sum(result.allocation.values()):.3f}") lines.append("") # Optimal Allocation lines.append("OPTIMAL ALLOCATION") lines.append("-" * 80) for _, row in result.by_channel.iterrows(): lines.append( f"{row['channel']:20s} | " f"${row['total_spend']:>12,.0f} ({row['spend_pct']:>5.1f}%) | " f"Response: {row['total_response']:>12,.0f} ({row['response_pct']:>5.1f}%) | " f"ROI: {row['roi']:>6.3f}" ) lines.append("") # Response Curves lines.append("RESPONSE CURVE PARAMETERS") lines.append("-" * 80) for _, row in curves_df.iterrows(): lines.append( f"{row['channel']:20s} | " f"Saturation: ${row['saturation']:>12,.0f} | " f"Slope: {row['slope']:>5.3f} | " f"R²: {row['r_2']:>5.3f}" ) lines.append("") # Comparison with current if current_allocation: lines.append("COMPARISON: CURRENT vs OPTIMAL") lines.append("-" * 80) comparison = compare_current_vs_optimal(current_allocation, result) for _, row in comparison.iterrows(): delta_str = f"{row['spend_delta']:+,.0f} ({row['spend_delta_pct']:+.1f}%)" lines.append( f"{row['channel']:20s} | " f"Current: ${row['current_spend']:>12,.0f} | " f"Optimal: ${row['optimal_spend']:>12,.0f} | " f"Delta: {delta_str:>25s}" ) lines.append("") lines.append("=" * 80) report = "\n".join(lines) if output_path: with open(output_path, 'w') as f: f.write(report) logger.info(f"Report saved to: {output_path}") return report