"""
Data preprocessing and loading utilities for DeepCausalMMM.
This module handles:
- Data loading and validation
- Bayesian Network creation
- Feature engineering (adstock, saturation)
- Data scaling and preparation
"""
import logging
import pandas as pd
import numpy as np
import torch
from typing import Dict, List, Tuple, Optional, Any
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from deepcausalmmm.exceptions import DataError, BayesianNetworkError, ValidationError
from deepcausalmmm.core.scaling import SimpleGlobalScaler
logger = logging.getLogger('deepcausalmmm')
# Try to import pgmpy, fallback to simpler approach if not available
try:
from pgmpy.estimators import HillClimbSearch, BicScore, MaximumLikelihoodEstimator
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.inference import VariableElimination
PGMPY_AVAILABLE = True
except ImportError:
PGMPY_AVAILABLE = False
[docs]
def validate_dataframe(df: pd.DataFrame, required_columns: List[str]) -> None:
"""
Validate that the dataframe contains required columns.
Args:
df: Input dataframe
required_columns: List of required column names
Raises:
ValidationError: If required columns are missing
"""
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValidationError(f"Missing required columns: {missing_columns}")
[docs]
def create_belief_vectors(
df: pd.DataFrame,
control_vars: List[str]
) -> Tuple[pd.DataFrame, Any]:
"""
Create belief vectors from control variables using Bayesian Network.
Args:
df: Input dataframe
control_vars: List of control variable names
Returns:
Tuple of (belief_vectors_df, bayesian_network_structure)
"""
if PGMPY_AVAILABLE and len(control_vars) > 0:
try:
# Use pgmpy for Bayesian Network
disc_ctrl = df[control_vars].astype(str)
bn_struct = HillClimbSearch(disc_ctrl).estimate(BicScore(disc_ctrl))
bn = DiscreteBayesianNetwork(bn_struct.edges())
bn.fit(disc_ctrl, MaximumLikelihoodEstimator)
bn_inf = VariableElimination(bn)
def belief(row):
beliefs = []
for v in control_vars:
evidence = {var: disc_ctrl.loc[row.name, var] for var in control_vars if var != v}
q = bn_inf.query(variables=[v], evidence=evidence, show_progress=False)
beliefs.append(np.argmax(q.values))
return beliefs
Z_ctrl = np.vstack(df.apply(belief, axis=1))
result_df = pd.DataFrame(Z_ctrl, columns=[f'belief_{i}' for i in range(Z_ctrl.shape[1])])
return result_df, bn_struct
except Exception as e:
logger.warning(f"Bayesian Network failed: {e}, using fallback")
# Fallback: use control variables directly
if len(control_vars) > 0:
Z_ctrl = df[control_vars].values
result_df = pd.DataFrame(Z_ctrl, columns=[f'belief_{i}' for i in range(Z_ctrl.shape[1])])
else:
# Create dummy control variables
Z_ctrl = np.random.randint(0, 5, (len(df), 15))
result_df = pd.DataFrame(Z_ctrl, columns=[f'belief_{i}' for i in range(15)])
# Create simple adjacency matrix for controls
n_ctrl = len(control_vars) if len(control_vars) > 0 else 15
bn_struct = type('MockStruct', (), {
'edges': lambda: [(f'control_{i}', f'control_{i+1}') for i in range(n_ctrl-1)]
})()
return result_df, bn_struct
[docs]
def prepare_data_for_training(
df: pd.DataFrame,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Prepare data for training with proper scaling and structure.
Args:
df: Input dataframe
params: Configuration parameters
Returns:
Dictionary containing prepared data and scalers
"""
burn_in = params.get("burn_in_weeks", 4)
df_work = df.copy()
# Get variable names from config
marketing_vars = params.get('marketing_vars', [])
control_vars = params.get('control_vars', [])
dependent_var = params.get('dependent_var', 'revenue')
region_var = params.get('region_var', None)
date_var = params.get('date_var', None)
# Auto-detect variables if not provided
if not marketing_vars:
marketing_vars = [col for col in df_work.columns
if any(keyword in col.lower() for keyword in
['spend', 'media', 'tv', 'digital', 'radio', 'social'])]
if not control_vars:
control_vars = [col for col in df_work.columns
if col not in marketing_vars + [dependent_var, region_var, date_var]
and col not in ['date', 'week', 'region']
and df_work[col].dtype in ['int64', 'float64']]
# Handle region creation/validation
if region_var and region_var in df_work.columns:
regions = df_work[region_var].unique()
logger.info(f"Using existing region column: {len(regions)} regions found")
else:
df_work['region'] = 'All_Data'
regions = df_work['region'].unique()
region_var = 'region'
logger.info(f"No region column specified, creating single region with {len(df_work)} rows")
# Add week if not exists
if 'week' not in df_work.columns:
if date_var and date_var in df_work.columns:
df_work['week'] = pd.to_datetime(df_work[date_var]).dt.isocalendar().week
else:
df_work['week'] = np.arange(len(df_work))
# Ensure target variable exists
if dependent_var not in df_work.columns:
if 'revenue' in df_work.columns:
dependent_var = 'revenue'
elif 'sales' in df_work.columns:
dependent_var = 'sales'
else:
raise DataError(f"Target variable '{dependent_var}' not found in data")
# Group data by region and ensure equal sequence lengths
region_data = {}
min_length = float('inf')
for region in regions:
region_df = df_work[df_work[region_var] == region].copy()
# Sort by time
if date_var and date_var in region_df.columns:
region_df = region_df.sort_values(date_var)
elif 'week' in region_df.columns:
region_df = region_df.sort_values('week')
region_data[region] = region_df
min_length = min(min_length, len(region_df))
# Truncate all regions to same length
for region in regions:
region_df_trimmed = region_data[region].iloc[:min_length]
# Add burn-in padding
if burn_in > 0:
pad_block = region_df_trimmed.iloc[:1].copy()
pad_block = pd.concat([pad_block] * burn_in, ignore_index=True)
# Add small jitter to media columns
for col in marketing_vars:
if col in pad_block.columns:
pad_block[col] += np.random.normal(0, 0.01, size=burn_in)
region_df_trimmed = pd.concat([pad_block, region_df_trimmed], ignore_index=True)
region_data[region] = region_df_trimmed
# Create standardized feature matrices
n_regions = len(regions)
n_time_steps = min_length + burn_in
n_media_target = len(marketing_vars)
n_control_target = max(len(control_vars), 1)
# Media variables matrix
media_matrix = np.zeros((n_regions, n_time_steps, n_media_target))
for i, region in enumerate(regions):
region_df = region_data[region]
for j, var in enumerate(marketing_vars):
if var in region_df.columns:
media_matrix[i, :, j] = region_df[var].values
# Control variables matrix
control_matrix = np.zeros((n_regions, n_time_steps, n_control_target))
for i, region in enumerate(regions):
region_df = region_data[region]
for j, var in enumerate(control_vars):
if var in region_df.columns:
control_matrix[i, :, j] = region_df[var].values
# If no control variables, create intercept
if len(control_vars) == 0:
control_matrix[i, :, 0] = 1.0
# Target variable matrix
y_matrix = np.zeros((n_regions, n_time_steps))
for i, region in enumerate(regions):
region_df = region_data[region]
y_matrix[i, :] = region_df[dependent_var].values
# Create region IDs
region_ids = np.arange(n_regions)
# Scale the data
media_scaler = MinMaxScaler()
media_matrix_flat = media_matrix.reshape(-1, n_media_target)
media_matrix_scaled = media_scaler.fit_transform(media_matrix_flat)
media_matrix_scaled = media_matrix_scaled.reshape(n_regions, n_time_steps, n_media_target)
control_scaler = MinMaxScaler()
control_matrix_flat = control_matrix.reshape(-1, n_control_target)
control_matrix_scaled = control_scaler.fit_transform(control_matrix_flat)
control_matrix_scaled = control_matrix_scaled.reshape(n_regions, n_time_steps, n_control_target)
y_scaler = MinMaxScaler()
y_matrix_flat = y_matrix.reshape(-1, 1)
y_matrix_scaled = y_scaler.fit_transform(y_matrix_flat)
y_matrix_scaled = y_matrix_scaled.reshape(n_regions, n_time_steps)
# Convert to tensors
X_m = torch.tensor(media_matrix_scaled, dtype=torch.float32)
X_c = torch.tensor(control_matrix_scaled, dtype=torch.float32)
y = torch.tensor(y_matrix_scaled, dtype=torch.float32)
R = torch.tensor(region_ids, dtype=torch.long)
logger.info(f"Data preparation complete:")
logger.info(f" - Regions: {n_regions}")
logger.info(f" - Time steps: {n_time_steps}")
logger.info(f" - Media variables: {len(marketing_vars)} → {n_media_target}")
logger.info(f" - Control variables: {len(control_vars)} → {n_control_target}")
logger.info(f" - Target variable: {dependent_var}")
logger.info(f" - Shapes: X_m={X_m.shape}, X_c={X_c.shape}, y={y.shape}, R={R.shape}")
return {
'X_m': X_m,
'X_c': X_c,
'R': R,
'y': y,
'burn_in': burn_in,
'media_scaler': media_scaler,
'control_scaler': control_scaler,
'y_scaler': y_scaler,
'marketing_vars': marketing_vars,
'control_vars': control_vars,
'dependent_var': dependent_var,
'regions': regions,
}
[docs]
def load_and_preprocess_data(
file_path: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Load data from file and preprocess for training.
Args:
file_path: Path to data file (CSV, Excel, etc.)
params: Configuration parameters
Returns:
Dictionary containing prepared data and metadata
"""
# Load data
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
raise DataError(f"Unsupported file format: {file_path}")
# Validate data
required_columns = params.get('required_columns', ['revenue'])
validate_dataframe(df, required_columns)
# Prepare data
data_dict = prepare_data_for_training(df, params)
# Create Bayesian Network structure
Z_ctrl, bn_struct = create_belief_vectors(df, data_dict['control_vars'])
A_media = create_media_adjacency(data_dict['marketing_vars'], bn_struct)
data_dict.update({
'belief_vectors': Z_ctrl,
'bayesian_network': bn_struct,
'media_adjacency': A_media,
'original_dataframe': df,
})
return data_dict
[docs]
class UnifiedDataPipeline:
"""
Unified data processing pipeline for DeepCausalMMM models.
This pipeline ensures consistent data transformations between training and holdout
datasets, implementing the complete preprocessing workflow required for MMM analysis.
It handles temporal splitting, multi-scale normalization, seasonal decomposition,
and tensor preparation for PyTorch models.
Key Features:
- Temporal train/holdout splitting (respects time series nature)
- SOV (Share of Voice) scaling for media channels
- Z-score normalization for control variables
- Min-Max scaling for seasonal components (per region)
- Burn-in padding for GRU stabilization
- Automatic tensor conversion and device handling
- Inverse transformation utilities for interpretation
- Region encoding and validation
The pipeline maintains data integrity by:
- Using the same scaler fit on training data for holdout
- Preserving temporal order in all transformations
- Handling missing values and outliers appropriately
- Ensuring consistent tensor shapes across regions
Parameters
----------
config : Dict[str, Any]
Configuration dictionary containing:
- 'holdout_ratio': Fraction of data for holdout (default 0.08)
- 'burn_in_weeks': Number of weeks for padding (default 6)
- 'random_seed': Seed for reproducible operations (default 42)
- Media channel names, control variable names, etc.
Attributes
----------
scaler : SimpleGlobalScaler
Fitted scaler for consistent transformations
seasonal_detector : DetectSeasonality
Seasonal decomposition utility
media_columns : List[str]
Names of media channel columns
control_columns : List[str]
Names of control variable columns
region_column : str
Name of region identifier column
target_column : str
Name of target variable column
Examples
--------
>>> import pandas as pd
>>> from deepcausalmmm.core.data import UnifiedDataPipeline
>>> from deepcausalmmm.core.config import get_default_config
>>>
>>> # Load your MMM dataset
>>> df = pd.read_csv('mmm_data.csv')
>>> config = get_default_config()
>>>
>>> # Initialize and fit pipeline
>>> pipeline = UnifiedDataPipeline(config)
>>> processed_data = pipeline.fit_transform(df)
>>>
>>> # Access processed tensors
>>> X_media_train = processed_data['X_media_train']
>>> y_train = processed_data['y_train']
>>>
>>> # Get holdout data
>>> X_media_holdout = processed_data['X_media_holdout']
>>> y_holdout = processed_data['y_holdout']
>>>
>>> print(f"Training shape: {X_media_train.shape}")
>>> print(f"Holdout shape: {X_media_holdout.shape}")
"""
[docs]
def __init__(self, config: Dict[str, Any]):
"""
Initialize the unified data pipeline.
Args:
config: Configuration dictionary with all parameters
"""
self.config = config
self.scaler = None
self.padding_weeks = config.get('burn_in_weeks', 20)
self.fitted = False
[docs]
def temporal_split(self,
X_media: np.ndarray,
X_control: np.ndarray,
y: np.ndarray,
holdout_ratio: Optional[float] = None) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]:
"""
Perform time series split of data using ratio-based approach.
This ensures adequate holdout data regardless of burn-in weeks.
Args:
X_media: Media data [regions, weeks, channels]
X_control: Control data [regions, weeks, controls]
y: Target data [regions, weeks]
holdout_ratio: Fraction of data for holdout (uses config if None)
Returns:
Tuple of (train_data_dict, holdout_data_dict)
"""
n_weeks = X_media.shape[1]
# Use config holdout_ratio if not provided
if holdout_ratio is None:
holdout_ratio = self.config.get('holdout_ratio', 0.27)
# Calculate split point using ratio
holdout_weeks = int(n_weeks * holdout_ratio)
train_weeks = n_weeks - holdout_weeks
self.train_weeks = train_weeks # Store for holdout seasonality alignment
# Ensure we have reasonable amounts of data for both splits
min_train_weeks = self.config.get('min_train_weeks', 60)
burn_in_weeks = self.config.get('burn_in_weeks', 20)
if train_weeks < min_train_weeks:
raise ValueError(f"Not enough training data: need at least {min_train_weeks} training weeks, "
f"but only have {train_weeks} with {holdout_ratio:.1%} holdout ratio")
if holdout_weeks < burn_in_weeks + 5: # Warn if holdout is small (though burn-in is added as padding, not removed)
logger.warning(f" WARNING: Holdout weeks ({holdout_weeks}) is small relative to burn-in weeks ({burn_in_weeks})")
logger.warning(f" Note: Burn-in is added as padding, all {holdout_weeks} actual weeks will be evaluated")
# Time series split: train on first weeks, test on last weeks
# Training data (chronologically first)
train_data = {
'X_media': X_media[:, :train_weeks, :].astype(np.float32),
'X_control': X_control[:, :train_weeks, :].astype(np.float32),
'y': y[:, :train_weeks].astype(np.float32)
}
# Holdout data (chronologically last - most recent weeks)
holdout_data = {
'X_media': X_media[:, train_weeks:, :].astype(np.float32),
'X_control': X_control[:, train_weeks:, :].astype(np.float32),
'y': y[:, train_weeks:].astype(np.float32)
}
logger.info(f"\n Unified Data Pipeline - Time Series Split (Ratio-Based):")
logger.info(f" Training: {train_weeks} actual weeks (weeks 1-{train_weeks}) - {train_weeks/n_weeks*100:.1f}%")
logger.info(f" Holdout: {holdout_weeks} actual weeks (weeks {train_weeks+1}-{n_weeks}) - {holdout_weeks/n_weeks*100:.1f}%")
logger.info(f" Burn-in Padding: {burn_in_weeks} weeks will be added to BOTH train and holdout")
logger.info(f" Model sees: Train {train_weeks + burn_in_weeks} weeks, Holdout {holdout_weeks + burn_in_weeks} weeks")
logger.info(f" Evaluation: Remove {burn_in_weeks} padding weeks, evaluate on ALL actual data")
logger.info(f" Time series approach: Training on historical data, testing on most recent data")
return train_data, holdout_data
def _add_seasonality_features(self, X_control: torch.Tensor, start_week: int = 0) -> torch.Tensor:
"""
UPDATED: Seasonality is now handled by the model using actual data decomposition.
This method now returns control variables unchanged.
"""
logger.info(" Seasonality now handled by model's data-driven seasonal decomposition (not as control variable)")
return X_control # Return unchanged - no artificial seasonality added
def _add_padding(self,
X_media: torch.Tensor,
X_control: torch.Tensor,
y: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Add consistent padding to all data splits.
Args:
X_media: Media tensor [regions, weeks, channels]
X_control: Control tensor [regions, weeks, controls]
y: Target tensor [regions, weeks]
Returns:
Tuple of padded tensors
"""
# Create padding tensors
media_padding = torch.zeros(X_media.shape[0], self.padding_weeks, X_media.shape[2])
control_padding = torch.zeros(X_control.shape[0], self.padding_weeks, X_control.shape[2])
y_padding = torch.zeros(y.shape[0], self.padding_weeks)
# Add padding to the beginning
X_media_padded = torch.cat([media_padding, X_media], dim=1)
X_control_padded = torch.cat([control_padding, X_control], dim=1)
y_padded = torch.cat([y_padding, y], dim=1)
return X_media_padded, X_control_padded, y_padded
[docs]
def get_evaluation_data(self,
y_true_padded: torch.Tensor,
y_pred_padded: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Extract evaluation data (removing burn-in padding).
Args:
y_true_padded: True values with padding
y_pred_padded: Predicted values with padding
Returns:
Tuple of (y_true_eval, y_pred_eval) without padding
"""
y_true_eval = y_true_padded[:, self.padding_weeks:].contiguous()
y_pred_eval = y_pred_padded[:, self.padding_weeks:].contiguous()
return y_true_eval, y_pred_eval
[docs]
def get_scaler(self) -> SimpleGlobalScaler:
"""
Get the fitted scaler for external use.
Returns:
Fitted SimpleGlobalScaler instance
"""
if not self.fitted:
raise ValueError("Pipeline must be fitted first")
return self.scaler
[docs]
def predict_and_postprocess(self,
model,
X_media: np.ndarray,
X_control: np.ndarray,
channel_names: List[str],
control_names: List[str],
combine_with_holdout: bool = True) -> Dict[str, Any]:
"""
Generate predictions and contributions using the unified pipeline.
Args:
model: Trained model
X_media: Media data (full dataset for contributions)
X_control: Control data (full dataset for contributions)
channel_names: Media channel names
control_names: Control variable names
combine_with_holdout: Whether to combine train+holdout for contributions
Returns:
Dictionary with predictions, contributions, and metadata
"""
if not self.fitted:
raise ValueError("Pipeline must be fitted first")
logger.info(f"\n Unified Post-Processing:")
# 1. Process full dataset for contributions (if requested)
if combine_with_holdout:
logger.info(f" Processing full dataset (train + holdout) for contributions...")
# CRITICAL FIX: Add seasonality features with PROPER temporal alignment
# Must match how training/holdout data was processed during training
logger.info(f" Adding seasonality features to full dataset with proper temporal alignment...")
X_control_with_seasonality_raw = self._add_seasonality_features(
torch.tensor(X_control), start_week=0 # This is correct for full dataset
)
# Transform full data WITH seasonality using fitted scaler
X_media_scaled, X_control_with_seasonality, _ = self.scaler.transform(
X_media, X_control_with_seasonality_raw.numpy(), np.zeros_like(X_media[:, :, 0]) # Dummy y for consistency
)
# Add padding
X_media_padded, X_control_padded, _ = self._add_padding(
X_media_scaled, X_control_with_seasonality, torch.zeros(X_media_scaled.shape[0], X_media_scaled.shape[1])
)
# Create region tensor
n_regions = X_media_padded.shape[0]
R = torch.arange(n_regions, dtype=torch.long)
# Get model predictions and contributions
model.eval()
with torch.no_grad():
y_pred_scaled, _, media_contributions, outputs = model(
X_media_padded, X_control_padded, R
)
control_contributions = outputs['control_contributions']
# Remove padding from predictions and contributions
y_pred_eval = y_pred_scaled[:, self.padding_weeks:]
media_contrib_eval = media_contributions[:, self.padding_weeks:, :]
control_contrib_eval = control_contributions[:, self.padding_weeks:, :]
# Inverse transform predictions to original scale
y_pred_orig = self.scaler.inverse_transform_target(y_pred_eval)
logger.info(f" Full dataset processed: {X_media.shape[0]} regions × {X_media.shape[1]} weeks")
logger.info(f" Predictions shape: {y_pred_orig.shape}")
logger.info(f" Media contributions shape: {media_contrib_eval.shape}")
logger.info(f" Control contributions shape: {control_contrib_eval.shape}")
else:
# Use only training data
y_pred_orig = None
media_contrib_eval = None
control_contrib_eval = None
outputs = {}
# 2. Prepare results dictionary
results = {
'predictions': y_pred_orig,
'media_contributions': media_contrib_eval,
'control_contributions': control_contrib_eval,
'channel_names': channel_names,
'control_names': control_names,
'model_outputs': outputs,
'scaler': self.scaler,
'config': self.config,
'padding_weeks': self.padding_weeks
}
return results
[docs]
def calculate_metrics(self,
y_true: torch.Tensor,
y_pred: torch.Tensor,
prefix: str = "") -> Dict[str, float]:
"""
Calculate comprehensive metrics for model evaluation.
Args:
y_true: True values
y_pred: Predicted values
prefix: Prefix for metric names (e.g., 'train_', 'holdout_')
Returns:
Dictionary of metrics
"""
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import numpy as np
# Convert to numpy if needed
if isinstance(y_true, torch.Tensor):
y_true_np = y_true.numpy().flatten()
else:
y_true_np = np.array(y_true).flatten()
if isinstance(y_pred, torch.Tensor):
y_pred_np = y_pred.numpy().flatten()
else:
y_pred_np = np.array(y_pred).flatten()
# Calculate metrics
r2 = r2_score(y_true_np, y_pred_np)
rmse = np.sqrt(mean_squared_error(y_true_np, y_pred_np))
mae = mean_absolute_error(y_true_np, y_pred_np)
# Relative metrics
y_mean = np.mean(y_true_np)
relative_rmse = (rmse / y_mean) * 100 if y_mean != 0 else 0
relative_mae = (mae / y_mean) * 100 if y_mean != 0 else 0
return {
f'{prefix}r2': r2,
f'{prefix}rmse': rmse,
f'{prefix}mae': mae,
f'{prefix}relative_rmse': relative_rmse,
f'{prefix}relative_mae': relative_mae,
f'{prefix}mean': y_mean
}
[docs]
def get_processed_full_data(self):
"""
Get the processed full dataset (train + holdout) with all transformations applied.
This includes seasonality features, scaling, and padding - exactly as the model expects.
Returns:
Dictionary containing processed X_media and X_control tensors
"""
if not hasattr(self, 'train_tensors') or not hasattr(self, 'holdout_tensors'):
raise ValueError("Pipeline must be fitted and holdout data processed before getting full data")
# Combine train and holdout data (both already include padding and seasonality)
X_media_full = torch.cat([
self.train_tensors['X_media'], # [n_regions, train_weeks + padding, n_media]
self.holdout_tensors['X_media'] # [n_regions, holdout_weeks + padding, n_media]
], dim=1)
X_control_full = torch.cat([
self.train_tensors['X_control'], # [n_regions, train_weeks + padding, n_control + seasonality]
self.holdout_tensors['X_control'] # [n_regions, holdout_weeks + padding, n_control + seasonality]
], dim=1)
logger.info(f" Combined full dataset:")
logger.info(f" Media: {X_media_full.shape}")
logger.info(f" Control: {X_control_full.shape} (includes seasonality)")
return {
'X_media': X_media_full.numpy(),
'X_control': X_control_full.numpy()
}