Source code for deepcausalmmm.postprocess.optimization

"""
Budget Optimization for Marketing Mix Modeling.

This module provides budget optimization capabilities using response curves
from DeepCausalMMM. It uses constrained optimization to find the optimal
allocation of marketing budget across channels to maximize predicted response.

The optimizer uses Hill equation saturation curves fitted by ResponseCurveFit
to predict channel responses at different spend levels and finds the allocation
that maximizes total response subject to budget and business constraints.
"""

from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
import logging

import numpy as np
import pandas as pd
import scipy.optimize as op

logger = logging.getLogger('deepcausalmmm')


[docs] @dataclass class OptimizationResult: """ Result from budget optimization. Attributes ---------- success : bool Whether optimization converged successfully allocation : Dict[str, float] Optimal spend allocation by channel predicted_response : float Total predicted response at optimal allocation by_channel : pd.DataFrame Detailed results by channel with spend, response, and ROI message : str Optimization status message method : str Optimization method used Examples -------- >>> result = optimizer.optimize() >>> if result.success: ... print(f"Optimal allocation: {result.allocation}") ... print(f"Expected response: {result.predicted_response:,.0f}") ... print(result.by_channel) """ success: bool allocation: Dict[str, float] predicted_response: float by_channel: pd.DataFrame message: str = "" method: str = "trust-constr"
[docs] class BudgetOptimizer: """ Optimize marketing budget allocation using response curves. Uses constrained optimization (trust-constr, SLSQP, or differential evolution) with Hill transformation curves from ResponseCurveFit to find optimal spend allocation that maximizes total response subject to business constraints. Parameters ---------- budget : float Total budget to allocate across all channels channels : List[str] List of channel names to include in optimization response_curves : Dict[str, Dict] Response curve parameters by channel from ResponseCurveFit. Each channel dict should contain: 'top', 'bottom', 'saturation', 'slope' num_weeks : int, default=52 Number of weeks for planning horizon (annual by default) method : str, default='trust-constr' Optimization method: 'trust-constr', 'SLSQP', 'differential_evolution', 'hybrid' Attributes ---------- constraints_df : pd.DataFrame or None DataFrame with channel-level constraints (lower, upper bounds) Examples -------- >>> # After fitting response curves with ResponseCurveFit >>> curves = { ... 'TV': {'top': 1000000, 'bottom': 0, 'saturation': 50000, 'slope': 1.5}, ... 'Search': {'top': 800000, 'bottom': 0, 'saturation': 30000, 'slope': 2.0}, ... 'Social': {'top': 600000, 'bottom': 0, 'saturation': 20000, 'slope': 1.8} ... } >>> >>> optimizer = BudgetOptimizer( ... budget=1000000, ... channels=['TV', 'Search', 'Social'], ... response_curves=curves, ... num_weeks=52 ... ) >>> >>> # Optional: Set channel-specific constraints >>> optimizer.set_constraints({ ... 'TV': {'lower': 50000, 'upper': 500000}, ... 'Search': {'lower': 100000, 'upper': 400000} ... }) >>> >>> # Run optimization >>> result = optimizer.optimize() >>> >>> # View results >>> if result.success: ... print("Optimal Allocation:") ... for channel, spend in result.allocation.items(): ... print(f" {channel}: ${spend:,.0f}") ... print(f"\\nTotal Response: {result.predicted_response:,.0f}") ... print(f"\\nDetailed Results:\\n{result.by_channel}") Notes ----- The optimizer maximizes total response using the Hill equation: .. math:: response = bottom + (top - bottom) * \\frac{spend^{slope}}{saturation^{slope} + spend^{slope}} Where: - `top`: Maximum response (saturation level) - `bottom`: Minimum response (typically 0) - `saturation`: Spend level at half-maximum response - `slope`: Steepness of the response curve The optimization problem is: .. math:: \\max_{x_1, ..., x_n} \\sum_{i=1}^{n} response_i(x_i) s.t. \\sum_{i=1}^{n} x_i = budget lower_i \\leq x_i \\leq upper_i \\quad \\forall i """
[docs] def __init__( self, budget: float, channels: List[str], response_curves: Dict[str, Dict], *, num_weeks: int = 52, method: str = 'trust-constr' ): """Initialize BudgetOptimizer with budget, channels, and response curves.""" self.budget = budget self.channels = channels self.response_curves = response_curves self.num_weeks = num_weeks self.method = method # Validate method valid_methods = ['trust-constr', 'SLSQP', 'differential_evolution', 'hybrid'] if method not in valid_methods: raise ValueError(f"Method must be one of {valid_methods}, got '{method}'") # Validate channels have curves missing_curves = [ch for ch in channels if ch not in response_curves] if missing_curves: raise ValueError(f"Missing response curves for channels: {missing_curves}") # Validate response curve parameters required_params = ['top', 'bottom', 'saturation', 'slope'] for channel in channels: missing_params = [p for p in required_params if p not in response_curves[channel]] if missing_params: raise ValueError( f"Channel '{channel}' missing required parameters: {missing_params}" ) # Initialize constraints self.constraints_df: Optional[pd.DataFrame] = None logger.info( f"Initialized BudgetOptimizer: " f"Budget=${budget:,.0f}, Channels={len(channels)}, Weeks={num_weeks}, Method={method}" )
[docs] def set_constraints(self, constraints: Dict[str, Dict[str, float]]) -> None: """ Set spend constraints for channels. Parameters ---------- constraints : Dict[str, Dict[str, float]] Channel constraints: {'channel': {'lower': min_spend, 'upper': max_spend}} Examples -------- >>> optimizer.set_constraints({ ... 'TV': {'lower': 50000, 'upper': 500000}, ... 'Search': {'lower': 100000, 'upper': 400000}, ... 'Social': {'lower': 25000, 'upper': 300000} ... }) Notes ----- - Channels not specified in constraints get default bounds: [0, budget] - Upper bounds are automatically capped at total budget - If lower > upper, lower is reset to 0 - Upper bounds cannot be 0 (would make channel unusable) """ constraints_list = [] for channel in self.channels: if channel in constraints: constraints_list.append({ 'channel': channel, 'lower': constraints[channel].get('lower', 0), 'upper': constraints[channel].get('upper', self.budget) }) else: # Default: no minimum, max is total budget constraints_list.append({ 'channel': channel, 'lower': 0, 'upper': self.budget }) self.constraints_df = pd.DataFrame(constraints_list) # Validate constraints invalid = self.constraints_df[self.constraints_df['upper'] == 0] if len(invalid) > 0: raise ValueError( f"Upper constraints cannot be 0 for channels: {invalid['channel'].tolist()}" ) # Ensure upper doesn't exceed budget self.constraints_df['upper'] = np.minimum(self.constraints_df['upper'], self.budget) # Ensure lower doesn't exceed upper self.constraints_df['lower'] = np.where( self.constraints_df['lower'] > self.constraints_df['upper'], 0, self.constraints_df['lower'] ) logger.info(f"Set constraints for {len(self.channels)} channels")
def _predict_response(self, spend: float, channel: str) -> float: """ Predict response using Hill equation. Parameters ---------- spend : float Weekly spend amount channel : str Channel name Returns ------- float Predicted response (weekly) """ params = self.response_curves[channel] bottom = params['bottom'] top = params['top'] saturation = params['saturation'] slope = params['slope'] # Hill equation with safe exponentiation try: spend_pow = spend ** slope sat_pow = saturation ** slope except (OverflowError, RuntimeWarning): # Handle overflow with clipping spend_pow = np.clip(spend ** slope, 0, 1e100) sat_pow = np.clip(saturation ** slope, 0, 1e100) response = bottom + (top - bottom) * spend_pow / (sat_pow + spend_pow) return float(response) def _objective(self, x: np.ndarray) -> float: """ Objective function to minimize (negative of total response). Parameters ---------- x : np.ndarray Array of total spend allocations (one per channel) Returns ------- float Negative total response (for minimization) """ # x contains total spend per channel, convert to weekly weekly_spend = x / self.num_weeks # Calculate total response across all channels total_response = 0.0 for i, channel in enumerate(self.channels): weekly_response = self._predict_response(weekly_spend[i], channel) total_response += weekly_response * self.num_weeks # Return negative (we minimize, but want to maximize response) return -total_response def _constraint_budget(self, x: np.ndarray) -> float: """ Equality constraint: sum of spend equals budget. Parameters ---------- x : np.ndarray Array of spend allocations Returns ------- float Difference from budget (should be 0 at optimum) """ return float(np.sum(x) - self.budget) def _get_bounds(self) -> List[Tuple[float, float]]: """ Get bounds for optimization. Returns ------- List[Tuple[float, float]] List of (lower, upper) bounds for each channel """ if self.constraints_df is None: # Default: no minimum, max is total budget return [(0.0, float(self.budget)) for _ in self.channels] bounds = [] for channel in self.channels: row = self.constraints_df[self.constraints_df['channel'] == channel] if len(row) > 0: bounds.append((float(row['lower'].iloc[0]), float(row['upper'].iloc[0]))) else: bounds.append((0.0, float(self.budget))) return bounds def _get_initial_guess(self) -> np.ndarray: """ Get initial guess for optimization. Uses equal allocation as starting point, adjusted to respect bounds. Returns ------- np.ndarray Initial spend allocation """ # Start with equal allocation x0 = np.full(len(self.channels), self.budget / len(self.channels)) # Adjust to respect bounds bounds = self._get_bounds() for i, (lower, upper) in enumerate(bounds): x0[i] = np.clip(x0[i], lower, upper) # Normalize to match budget exactly if np.sum(x0) > 0: x0 = x0 * (self.budget / np.sum(x0)) return x0
[docs] def optimize(self) -> OptimizationResult: """ Run optimization to find optimal budget allocation. Returns ------- OptimizationResult Optimization results including allocation, predicted response, and details Examples -------- >>> result = optimizer.optimize() >>> if result.success: ... print("Optimization successful!") ... print(f"Predicted response: {result.predicted_response:,.0f}") ... for channel, spend in result.allocation.items(): ... roi = result.by_channel[result.by_channel['channel']==channel]['roi'].iloc[0] ... print(f"{channel}: ${spend:,.0f} (ROI: {roi:.2f})") ... else: ... print(f"Optimization failed: {result.message}") """ logger.info(f"Starting budget optimization with method={self.method}...") # Get initial guess, bounds, and constraints x0 = self._get_initial_guess() bounds = self._get_bounds() constraints = {'type': 'eq', 'fun': self._constraint_budget} # Run optimization based on method if self.method == 'differential_evolution': result = self._optimize_global(bounds, constraints) elif self.method == 'hybrid': result = self._optimize_hybrid(bounds, constraints, x0) else: result = self._optimize_gradient(x0, bounds, constraints, self.method) if result.success: logger.info(f"Optimization converged: {result.message}") # Extract results allocation = {ch: float(spend) for ch, spend in zip(self.channels, result.x)} predicted_response = -result.fun # Negate back to positive # Build detailed results by channel by_channel = self._calculate_by_channel(result.x) return OptimizationResult( success=True, allocation=allocation, predicted_response=float(predicted_response), by_channel=by_channel, message=result.message, method=self.method ) else: logger.error(f"Optimization failed: {result.message}") return OptimizationResult( success=False, allocation={ch: 0.0 for ch in self.channels}, predicted_response=0.0, by_channel=pd.DataFrame(), message=result.message, method=self.method )
def _optimize_gradient( self, x0: np.ndarray, bounds: List[Tuple[float, float]], constraints: Dict, method: str ): """Gradient-based optimization (trust-constr or SLSQP).""" if method == 'trust-constr': # trust-constr: More robust than SLSQP, handles constraints better return op.minimize( fun=self._objective, x0=x0, method='trust-constr', bounds=bounds, constraints=constraints, options={ 'maxiter': 500, 'verbose': 1, 'gtol': 1e-8, 'xtol': 1e-10 } ) else: # SLSQP return op.minimize( fun=self._objective, x0=x0, method='SLSQP', bounds=bounds, constraints=constraints, options={ 'maxiter': 400, 'disp': True, 'ftol': 1e-9 }, jac='3-point' ) def _optimize_global(self, bounds: List[Tuple[float, float]], constraints: Dict): """Global optimization using differential evolution.""" from scipy.optimize import differential_evolution logger.info("Running global optimization (may take longer)...") return differential_evolution( func=self._objective, bounds=bounds, constraints=constraints, strategy='best1bin', maxiter=1000, popsize=15, tol=0.01, atol=0, seed=42, polish=True, # Local refinement at end workers=1 ) def _optimize_hybrid( self, bounds: List[Tuple[float, float]], constraints: Dict, x0: np.ndarray ): """Hybrid: global search followed by local refinement.""" from scipy.optimize import differential_evolution logger.info("Running hybrid optimization (global + local)...") # Stage 1: Quick global search logger.info("Stage 1: Global search...") global_result = differential_evolution( func=self._objective, bounds=bounds, constraints=constraints, maxiter=100, # Quick scan popsize=10, polish=False, workers=1 ) # Stage 2: Local refinement from global optimum logger.info("Stage 2: Local refinement...") local_result = op.minimize( fun=self._objective, x0=global_result.x, method='trust-constr', bounds=bounds, constraints=constraints, options={'maxiter': 300, 'verbose': 1} ) return local_result def _calculate_by_channel(self, optimal_spend: np.ndarray) -> pd.DataFrame: """ Calculate detailed metrics by channel. Parameters ---------- optimal_spend : np.ndarray Optimal spend allocation (total over planning horizon) Returns ------- pd.DataFrame Results by channel with spend, response, ROI, and saturation metrics """ results = [] weekly_spend = optimal_spend / self.num_weeks for i, channel in enumerate(self.channels): weekly_response = self._predict_response(weekly_spend[i], channel) total_response = weekly_response * self.num_weeks params = self.response_curves[channel] # Calculate saturation percentage saturation_pct = (weekly_spend[i] / params['saturation']) * 100 if params['saturation'] > 0 else 0 results.append({ 'channel': channel, 'total_spend': optimal_spend[i], 'weekly_spend': weekly_spend[i], 'weekly_response': weekly_response, 'total_response': total_response, 'roi': total_response / optimal_spend[i] if optimal_spend[i] > 0 else 0, 'spend_pct': (optimal_spend[i] / self.budget) * 100, 'response_pct': 0, # Will calculate after summing 'saturation_point': params['saturation'], 'saturation_pct': saturation_pct, 'slope': params['slope'] }) df = pd.DataFrame(results) # Calculate response percentage total_resp = df['total_response'].sum() if total_resp > 0: df['response_pct'] = (df['total_response'] / total_resp) * 100 # Sort by total spend descending df = df.sort_values('total_spend', ascending=False).reset_index(drop=True) return df
[docs] def compare_scenarios( self, scenarios: Dict[str, Dict[str, float]] ) -> pd.DataFrame: """ Compare different budget allocation scenarios. Parameters ---------- scenarios : Dict[str, Dict[str, float]] Dictionary of scenarios: {'scenario_name': {'channel': spend, ...}} Returns ------- pd.DataFrame Comparison of scenarios with predicted responses and ROIs Examples -------- >>> scenarios = { ... 'Current': {'TV': 400000, 'Search': 350000, 'Social': 250000}, ... 'Optimized': result.allocation, ... 'Heavy TV': {'TV': 600000, 'Search': 250000, 'Social': 150000} ... } >>> comparison = optimizer.compare_scenarios(scenarios) >>> print(comparison) """ comparison = [] for scenario_name, allocation in scenarios.items(): # Validate allocation if set(allocation.keys()) != set(self.channels): logger.warning( f"Scenario '{scenario_name}' has different channels, skipping" ) continue total_spend = sum(allocation.values()) total_response = 0.0 # Calculate response for this allocation for channel, spend in allocation.items(): weekly_spend = spend / self.num_weeks weekly_response = self._predict_response(weekly_spend, channel) total_response += weekly_response * self.num_weeks comparison.append({ 'scenario': scenario_name, 'total_spend': total_spend, 'total_response': total_response, 'roi': total_response / total_spend if total_spend > 0 else 0, **allocation }) df = pd.DataFrame(comparison) return df
[docs] def optimize_budget_from_curves( budget: float, curve_params: pd.DataFrame, *, channel_col: str = 'channel', num_weeks: int = 52, constraints: Optional[Dict[str, Dict[str, float]]] = None, method: str = 'trust-constr' ) -> OptimizationResult: """ Convenience function to optimize budget directly from curve parameters DataFrame. This function is useful when you have response curve parameters in a DataFrame (e.g., from ResponseCurveFit fitted on multiple channels) and want to quickly run optimization without manually setting up the BudgetOptimizer. Parameters ---------- budget : float Total budget to allocate curve_params : pd.DataFrame DataFrame with response curve parameters. Required columns: channel, top, bottom, saturation, slope channel_col : str, default='channel' Name of the channel column in curve_params num_weeks : int, default=52 Number of weeks for planning horizon constraints : Dict[str, Dict[str, float]], optional Channel-specific constraints: {'channel': {'lower': min, 'upper': max}} method : str, default='trust-constr' Optimization method Returns ------- OptimizationResult Optimization results Examples -------- >>> # After fitting curves for multiple channels >>> curves_df = pd.DataFrame({ ... 'channel': ['TV', 'Search', 'Social'], ... 'top': [1000000, 800000, 600000], ... 'bottom': [0, 0, 0], ... 'saturation': [50000, 30000, 20000], ... 'slope': [1.5, 2.0, 1.8] ... }) >>> >>> result = optimize_budget_from_curves( ... budget=1000000, ... curve_params=curves_df, ... constraints={'TV': {'lower': 100000, 'upper': 600000}} ... ) >>> print(result.allocation) """ # Validate required columns required_cols = [channel_col, 'top', 'bottom', 'saturation', 'slope'] missing_cols = [col for col in required_cols if col not in curve_params.columns] if missing_cols: raise ValueError(f"curve_params missing required columns: {missing_cols}") # Convert DataFrame to dictionary format channels = curve_params[channel_col].tolist() response_curves = {} for _, row in curve_params.iterrows(): channel = row[channel_col] response_curves[channel] = { 'top': row['top'], 'bottom': row['bottom'], 'saturation': row['saturation'], 'slope': row['slope'] } # Create optimizer optimizer = BudgetOptimizer( budget=budget, channels=channels, response_curves=response_curves, num_weeks=num_weeks, method=method ) # Set constraints if provided if constraints: optimizer.set_constraints(constraints) # Run optimization return optimizer.optimize()