Data Processing

Unified Data Pipeline

Data preprocessing and loading utilities for DeepCausalMMM.

This module handles: - Data loading and validation - Bayesian Network creation - Feature engineering (adstock, saturation) - Data scaling and preparation

deepcausalmmm.core.data.validate_dataframe(df: DataFrame, required_columns: List[str]) None[source]

Validate that the dataframe contains required columns.

Parameters:
  • df – Input dataframe

  • required_columns – List of required column names

Raises:

ValidationError – If required columns are missing

deepcausalmmm.core.data.create_belief_vectors(df: DataFrame, control_vars: List[str]) Tuple[DataFrame, Any][source]

Create belief vectors from control variables using Bayesian Network.

Parameters:
  • df – Input dataframe

  • control_vars – List of control variable names

Returns:

Tuple of (belief_vectors_df, bayesian_network_structure)

deepcausalmmm.core.data.create_media_adjacency(media_vars: List[str], bn_struct: Any | None = None) Tensor[source]

Create adjacency matrix for media variables.

Parameters:
  • media_vars – List of media variable names

  • bn_struct – Bayesian network structure (optional)

Returns:

Adjacency matrix as torch tensor

deepcausalmmm.core.data.prepare_data_for_training(df: DataFrame, params: Dict[str, Any]) Dict[str, Any][source]

Prepare data for training with proper scaling and structure.

Parameters:
  • df – Input dataframe

  • params – Configuration parameters

Returns:

Dictionary containing prepared data and scalers

deepcausalmmm.core.data.load_and_preprocess_data(file_path: str, params: Dict[str, Any]) Dict[str, Any][source]

Load data from file and preprocess for training.

Parameters:
  • file_path – Path to data file (CSV, Excel, etc.)

  • params – Configuration parameters

Returns:

Dictionary containing prepared data and metadata

class deepcausalmmm.core.data.UnifiedDataPipeline(config: Dict[str, Any])[source]

Bases: object

Unified data processing pipeline for DeepCausalMMM models.

This pipeline ensures consistent data transformations between training and holdout datasets, implementing the complete preprocessing workflow required for MMM analysis. It handles temporal splitting, multi-scale normalization, seasonal decomposition, and tensor preparation for PyTorch models.

Key Features: - Temporal train/holdout splitting (respects time series nature) - SOV (Share of Voice) scaling for media channels - Z-score normalization for control variables - Min-Max scaling for seasonal components (per region) - Burn-in padding for GRU stabilization - Automatic tensor conversion and device handling - Inverse transformation utilities for interpretation - Region encoding and validation

The pipeline maintains data integrity by: - Using the same scaler fit on training data for holdout - Preserving temporal order in all transformations - Handling missing values and outliers appropriately - Ensuring consistent tensor shapes across regions

Parameters:

config (Dict[str, Any]) – Configuration dictionary containing: - ‘holdout_ratio’: Fraction of data for holdout (default 0.08) - ‘burn_in_weeks’: Number of weeks for padding (default 6) - ‘random_seed’: Seed for reproducible operations (default 42) - Media channel names, control variable names, etc.

scaler

Fitted scaler for consistent transformations

Type:

SimpleGlobalScaler

seasonal_detector

Seasonal decomposition utility

Type:

DetectSeasonality

media_columns

Names of media channel columns

Type:

List[str]

control_columns

Names of control variable columns

Type:

List[str]

region_column

Name of region identifier column

Type:

str

target_column

Name of target variable column

Type:

str

Examples

>>> import pandas as pd
>>> from deepcausalmmm.core.data import UnifiedDataPipeline
>>> from deepcausalmmm.core.config import get_default_config
>>>
>>> # Load your MMM dataset
>>> df = pd.read_csv('mmm_data.csv')
>>> config = get_default_config()
>>>
>>> # Initialize and fit pipeline
>>> pipeline = UnifiedDataPipeline(config)
>>> processed_data = pipeline.fit_transform(df)
>>>
>>> # Access processed tensors
>>> X_media_train = processed_data['X_media_train']
>>> y_train = processed_data['y_train']
>>>
>>> # Get holdout data
>>> X_media_holdout = processed_data['X_media_holdout']
>>> y_holdout = processed_data['y_holdout']
>>>
>>> print(f"Training shape: {X_media_train.shape}")
>>> print(f"Holdout shape: {X_media_holdout.shape}")
__init__(config: Dict[str, Any])[source]

Initialize the unified data pipeline.

Parameters:

config – Configuration dictionary with all parameters

temporal_split(X_media: ndarray, X_control: ndarray, y: ndarray, holdout_ratio: float | None = None) Tuple[Dict[str, ndarray], Dict[str, ndarray]][source]

Perform time series split of data using ratio-based approach. This ensures adequate holdout data regardless of burn-in weeks.

Parameters:
  • X_media – Media data [regions, weeks, channels]

  • X_control – Control data [regions, weeks, controls]

  • y – Target data [regions, weeks]

  • holdout_ratio – Fraction of data for holdout (uses config if None)

Returns:

Tuple of (train_data_dict, holdout_data_dict)

fit_and_transform_training(train_data: Dict[str, ndarray]) Dict[str, Tensor][source]

Fit scaler on training data and transform it.

Parameters:

train_data – Dictionary with training data arrays

Returns:

Dictionary with transformed and padded tensors

transform_holdout(holdout_data: Dict[str, ndarray]) Dict[str, Tensor][source]

Transform holdout data using the fitted scaler (same transformations as training).

Parameters:

holdout_data – Dictionary with holdout data arrays

Returns:

Dictionary with transformed and padded tensors

inverse_transform_predictions(y_pred_scaled: Tensor, remove_padding: bool = True) Tensor[source]

Inverse transform predictions to original scale.

Parameters:
  • y_pred_scaled – Predictions in scaled space

  • remove_padding – Whether to remove padding weeks

Returns:

Predictions in original scale

get_evaluation_data(y_true_padded: Tensor, y_pred_padded: Tensor) Tuple[Tensor, Tensor][source]

Extract evaluation data (removing burn-in padding).

Parameters:
  • y_true_padded – True values with padding

  • y_pred_padded – Predicted values with padding

Returns:

Tuple of (y_true_eval, y_pred_eval) without padding

inverse_transform_contributions(media_contributions: Tensor, y_true: Tensor) Tensor[source]

Inverse transform media contributions to original scale.

Parameters:
  • media_contributions – Media contributions in scaled space

  • y_true – True values in original scale (for scaling reference)

Returns:

Media contributions in original scale

get_scaler() SimpleGlobalScaler[source]

Get the fitted scaler for external use.

Returns:

Fitted SimpleGlobalScaler instance

predict_and_postprocess(model, X_media: ndarray, X_control: ndarray, channel_names: List[str], control_names: List[str], combine_with_holdout: bool = True) Dict[str, Any][source]

Generate predictions and contributions using the unified pipeline.

Parameters:
  • model – Trained model

  • X_media – Media data (full dataset for contributions)

  • X_control – Control data (full dataset for contributions)

  • channel_names – Media channel names

  • control_names – Control variable names

  • combine_with_holdout – Whether to combine train+holdout for contributions

Returns:

Dictionary with predictions, contributions, and metadata

calculate_metrics(y_true: Tensor, y_pred: Tensor, prefix: str = '') Dict[str, float][source]

Calculate comprehensive metrics for model evaluation.

Parameters:
  • y_true – True values

  • y_pred – Predicted values

  • prefix – Prefix for metric names (e.g., ‘train_’, ‘holdout_’)

Returns:

Dictionary of metrics

get_processed_full_data()[source]

Get the processed full dataset (train + holdout) with all transformations applied. This includes seasonality features, scaling, and padding - exactly as the model expects.

Returns:

Dictionary containing processed X_media and X_control tensors