Helper Utilities#
- class mlsynth.utils.helperutils.IndexSet(labels: ndarray, label_to_idx: Dict[Any, int])#
Bases:
objectImmutable bidirectional index mapping between arbitrary labels and integer indices.
This class provides a lightweight utility for converting between human-readable unit/time labels and integer indices used in NumPy arrays.
- labels#
Ordered array of labels (e.g., unit IDs or time periods).
- Type:
np.ndarray
- from_labels(labels)#
Construct IndexSet from an iterable of labels.
- get_labels(indices)#
Convert integer indices to labels.
- get_index(labels)#
Convert labels to integer indices.
Notes
This structure is immutable (frozen=True).
Intended for consistent indexing across panel datasets.
- classmethod from_labels(labels: Iterable[Any]) IndexSet#
Construct an IndexSet from an iterable of labels.
- Parameters:
labels (Iterable[Any]) – Ordered sequence of unique identifiers.
- Returns:
IndexSet – Mapping object for label-index conversions.
- get_index(labels)#
Convert labels into integer indices.
- Parameters:
labels (array-like) – Input labels.
- Returns:
np.ndarray – Integer indices corresponding to input labels.
- get_labels(indices)#
Convert integer indices into corresponding labels.
- Parameters:
indices (array-like) – Integer indices.
- Returns:
np.ndarray – Corresponding labels.
- labels: ndarray#
- mlsynth.utils.helperutils.prenorm(input_array: ndarray, target: float = 100) ndarray#
Normalize a vector or matrix based on its last element or last row.
This function scales the input array input_array such that its last element (if input_array is 1D) or each column’s last element (if input_array is 2D, using the last row input_array[-1, :] for normalization factors) is scaled towards a target value. Specifically, each element input_array[i] (for 1D) or input_array[i, j] (for 2D) is transformed to (input_array[i] / input_array[-1]) * target or (input_array[i, j] / input_array[-1, j]) * target.
- Parameters:
input_array (np.ndarray) – Input array to be normalized. Can be 1-dimensional (vector) or 2-dimensional (matrix).
If 1D, shape (T,): Normalization is based on the last element input_array[-1].
If 2D, shape (T, N): Normalization is performed column-wise, based on the elements of the last row input_array[-1, :]. Each column input_array[:, j] is normalized using input_array[-1, j].
target (float, optional) – The target value to which the reference element(s) (last element or last row elements) are scaled. Default is 100.
- Returns:
np.ndarray – The normalized array, having the same shape as the input input_array.
- Raises:
MlsynthDataError – If any element used for normalization (i.e., input_array[-1] for 1D input, or any element in input_array[-1, :] for 2D input) is zero.
Examples
>>> # Example with a 1D array >>> x_1d = np.array([10, 20, 40, 50], dtype=float) >>> prenorm(x_1d, target=100) array([ 20., 40., 80., 100.])
>>> # Example with a 2D array (matrix) >>> x_2d = np.array([[1, 2, 3], [2, 4, 6], [4, 8, 12]], dtype=float) >>> # Last row is [4, 8, 12]. Target is 100. >>> # Col 0: [1, 2, 4] -> [1/4*100, 2/4*100, 4/4*100] = [25, 50, 100] >>> # Col 1: [2, 4, 8] -> [2/8*100, 4/8*100, 8/8*100] = [25, 50, 100] >>> # Col 2: [3, 6, 12] -> [3/12*100, 6/12*100, 12/12*100] = [25, 50, 100] >>> prenorm(x_2d, target=100) array([[ 25., 25., 25.], [ 50., 50., 50.], [100., 100., 100.]])
>>> # Example that raises MlsynthDataError >>> x_zero_1d = np.array([10, 0], dtype=float) >>> try: ... prenorm(x_zero_1d) ... except MlsynthDataError as e: ... print(e) Division by zero: Denominator for normalization is zero.
- mlsynth.utils.helperutils.sc_diagplot(config_list: List[Dict[str, Any]], save: str | None = None) None#
Generate diagnostic plots for synthetic control analyses.
For each configuration provided, this function plots the treated unit’s outcome trajectory against individual donor unit trajectories and the mean trajectory of all donor units. A vertical line indicates the start of the treatment period.
- Parameters:
config_list (List[Dict[str, Any]]) – A list of configuration dictionaries. Each dictionary defines settings for a single plot (or a subplot if multiple configs are provided). Each dictionary must contain the following keys, which are typically passed to an internal data preparation function (like dataprep):
- “df”pandas.DataFrame
The input panel data.
- “unitid”str
The name of the column in df that identifies the units (e.g., countries, firms).
- “time”str
The name of the column in df that identifies the time periods (e.g., years, quarters).
- “outcome”str
The name of the column in df representing the outcome variable to be plotted.
- “treat”Union[str, int, List[Union[str, int]]]
Identifier(s) for the treated unit(s). This is used by the internal data preparation function to distinguish treated units from donor units.
- “cohort”Optional[Union[str, int]], default None
If the data preparation step can result in multiple cohorts (e.g., different treatment start times), this key specifies which cohort’s data to use for the plot. If None and multiple cohorts are detected by dataprep, a ValueError is raised.
save (Optional[str], default None) – If provided, saves the plot to the specified file path instead of displaying it interactively. Accepts formats supported by matplotlib.pyplot.savefig (e.g., ‘.png’, ‘.pdf’, ‘.svg’).
- Returns:
None – This function displays a matplotlib plot or saves it to disk if save is specified.
- Raises:
MlsynthConfigError – If config_list is not a list. If multiple cohorts are detected in the data prepared from a config and the “cohort” key is not specified in that config.
Notes
This function uses a local import from .datautils import dataprep. This is done to avoid potential circular import issues if helperutils.py is imported by modules that dataprep might depend on indirectly. The plot styling is controlled by a predefined ubertheme.
Examples
>>> import pandas as pd >>> import numpy as np >>> from unittest.mock import patch >>> # Create sample data for the plot >>> data_dict = { ... 'year': [2000, 2001, 2002, 2000, 2001, 2002, 2000, 2001, 2002], ... 'country': ['A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C'], ... 'gdp': [1.0, 1.2, 1.0, 2.0, 2.1, 2.2, 1.5, 1.6, 1.7] ... } >>> df_sample = pd.DataFrame(data_dict) >>> >>> # Configuration for sc_diagplot >>> plot_config = [{ ... "df": df_sample, ... "unitid": "country", ... "time": "year", ... "outcome": "gdp", ... "treat": "A" # Unit 'A' is treated ... }] >>> >>> # Mock the output of dataprep for this example >>> # dataprep would normally process df_sample based on the config >>> mock_dataprep_output = { ... "y": np.array([1.0, 1.2, 1.0]), # Treated unit 'A' outcomes ... "donor_matrix": np.array([[2.0, 1.5], [2.1, 1.6], [2.2, 1.7]]), # Donors 'B', 'C' ... "treated_unit_name": "A", ... "pre_periods": 1, # Indicates treatment starts after the first period (index 0) ... "Ywide": pd.DataFrame(index=[2000, 2001, 2002]) # For time axis labels ... } >>> >>> # Patch dataprep and plt.show to run example non-interactively >>> with patch("mlsynth.utils.helperutils.dataprep", return_value=mock_dataprep_output), ... patch("matplotlib.pyplot.show"): ... sc_diagplot(plot_config) >>> # This would typically display a plot showing: >>> # - GDP of unit 'A' (black line). >>> # - GDP of donor units 'B' and 'C' (gray lines). >>> # - Mean GDP of donors 'B' and 'C' (blue line). >>> # - A vertical dashed line after year 2000, indicating treatment start. >>> # To save the figure instead: >>> sc_diagplot(plot_config, save="sc_diagplot_output.png")
- mlsynth.utils.helperutils.ssdid_est(treated_unit_outcomes_all_periods: ndarray, donor_units_outcomes_all_periods: ndarray, donor_weights: ndarray, time_weights_vector: ndarray, num_pre_treatment_periods: int, post_treatment_horizon_offset: int) float#
Compute the Synthetic Difference-in-Differences (SSDID) treatment effect estimate.
This function calculates the SSDID treatment effect for a specific post-treatment horizon post_treatment_horizon_offset. The estimate is derived by taking the difference between a post-treatment “gap” and a weighted pre-treatment “gap”.
The post-treatment gap is: post_treatment_gap = treated_unit_outcomes_all_periods[num_pre_treatment_periods + post_treatment_horizon_offset] - (donor_units_outcomes_all_periods[num_pre_treatment_periods + post_treatment_horizon_offset, :] @ donor_weights)
The weighted pre-treatment gap is: weighted_pre_treatment_gap = time_weights_vector @ (treated_unit_outcomes_all_periods[:num_pre_treatment_periods] - (donor_units_outcomes_all_periods[:num_pre_treatment_periods, :] @ donor_weights))
The SSDID estimate is post_treatment_gap - weighted_pre_treatment_gap.
- Parameters:
treated_unit_outcomes_all_periods (np.ndarray) – Outcome vector for the treated unit across all time periods. Shape (T_total,), where T_total is the total number of time periods.
donor_units_outcomes_all_periods (np.ndarray) – Matrix of outcomes for donor units across all time periods. Shape (T_total, J), where J is the number of donor units.
donor_weights (np.ndarray) – Donor weights used to create the synthetic control unit. Shape (J,). These weights typically sum to 1.
time_weights_vector (np.ndarray) – Time weights used to average the pre-treatment gaps. Shape (num_pre_treatment_periods,). These weights typically sum to 1.
num_pre_treatment_periods (int) – Number of pre-treatment periods. This defines the length of time_weights_vector and the segment of pre-treatment data used for calculating weighted_pre_treatment_gap.
post_treatment_horizon_offset (int) – Post-treatment horizon relative to num_pre_treatment_periods. The treatment effect is estimated for time period num_pre_treatment_periods + post_treatment_horizon_offset (0-indexed).
- Returns:
float – The estimated SSDID treatment effect at horizon post_treatment_horizon_offset.
- Raises:
MlsynthDataError – If input arrays have incorrect types, dimensions, or inconsistent shapes, or if period indices are out of bounds.
Examples
>>> T_total_ex, J_ex, num_pre_periods_ex, horizon_offset_ex = 20, 3, 10, 2 >>> treated_outcomes_ex = np.random.rand(T_total_ex) >>> donor_outcomes_ex = np.random.rand(T_total_ex, J_ex) >>> donor_weights_ex = np.array([0.5, 0.3, 0.2]) # Example donor weights >>> time_weights_ex = np.full(num_pre_periods_ex, 1/num_pre_periods_ex) # Example time weights (uniform) >>> att_k = ssdid_est(treated_outcomes_ex, donor_outcomes_ex, donor_weights_ex, time_weights_ex, num_pre_periods_ex, horizon_offset_ex) >>> print(f"SSDID estimate at horizon k={horizon_offset_ex}: {att_k:.3f}") SSDID estimate at horizon k=2: ...
- mlsynth.utils.helperutils.ssdid_lambda(treated_unit_outcomes_all_periods: ndarray, donor_units_outcomes_all_periods: ndarray, num_pre_treatment_periods_for_lambda: int, post_treatment_horizon_offset: int, l2_penalty_regularization_strength: float) Tuple[ndarray, float]#
Solve for time weights (lambda) and intercept for the SSDID method.
This function estimates time weights (optimal_time_weights) and an intercept (optimal_intercept) used in the Synthetic Difference-in-Differences (SSDID) method. These weights are optimized to make a weighted average of the treated unit’s pre-treatment outcomes match the average outcome of donor units at a specific post-treatment horizon post_treatment_horizon_offset.
The objective function is: min_{lambda, lambda_0} || treated_unit_outcomes_pre_treatment_for_lambda @ lambda - lambda_0 - target_donor_outcomes_at_horizon ||_2^2 + l2_penalty_regularization_strength^2 * ||lambda||_2^2 subject to sum(lambda) == 1. Here, target_donor_outcomes_at_horizon is donor_units_outcomes_all_periods[num_pre_treatment_periods_for_lambda + post_treatment_horizon_offset, :] (the average outcome of donor units at time num_pre_treatment_periods_for_lambda + post_treatment_horizon_offset).
- Parameters:
treated_unit_outcomes_all_periods (np.ndarray) – Outcome vector for the treated unit across all time periods. Shape (T_total,), where T_total is the total number of time periods. Only the first num_pre_treatment_periods_for_lambda periods are used in the optimization. (Formerly treated_y)
donor_units_outcomes_all_periods (np.ndarray) – Matrix of outcomes for donor units across all time periods. Shape (T_total, J), where J is the number of donor units. The row donor_units_outcomes_all_periods[num_pre_treatment_periods_for_lambda + post_treatment_horizon_offset, :] is used as the target for matching. (Formerly donor_matrix)
num_pre_treatment_periods_for_lambda (int) – Number of pre-treatment periods of treated_unit_outcomes_all_periods to use for constructing the weighted average. The time_weights_variable will have length num_pre_treatment_periods_for_lambda. (Formerly a)
post_treatment_horizon_offset (int) – Post-treatment horizon offset. The target for matching is the average donor outcome at time period num_pre_treatment_periods_for_lambda + post_treatment_horizon_offset. (Formerly k)
l2_penalty_regularization_strength (float) – Regularization parameter (non-negative) for the L2 penalty on the time weights time_weights_variable. A larger l2_penalty_regularization_strength imposes a stronger penalty. (Formerly eta)
- Returns:
Tuple[np.ndarray, float] – A tuple containing:
optimal_time_weights : np.ndarray The optimal time weights, shape (num_pre_treatment_periods_for_lambda,). These weights sum to 1. (Formerly lambda_val)
optimal_intercept : float The optimal intercept term. (Formerly lambda_0_val)
- Raises:
MlsynthDataError – If input arrays have incorrect types, dimensions, or inconsistent shapes.
MlsynthConfigError – If l2_penalty_regularization_strength is negative. If num_pre_treatment_periods_for_lambda or post_treatment_horizon_offset are negative or lead to invalid slice indices.
MlsynthEstimationError – If the CVXPY optimization solver fails.
Notes
The optimization is performed using CVXPY with the default solver.
target_donor_period_index = num_pre_treatment_periods_for_lambda + post_treatment_horizon_offset is the index into donor_units_outcomes_all_periods for the target observation.
Examples
>>> T_total_ex, J_ex = 20, 3 >>> num_pre_periods_lambda_ex, horizon_offset_ex, l2_penalty_ex = 10, 2, 0.05 # Target donor obs at time a+k = 12 >>> treated_outcomes_ex = np.random.rand(T_total_ex) >>> donor_outcomes_ex = np.random.rand(T_total_ex, J_ex) >>> opt_lambda_weights, opt_lambda_intercept = ssdid_lambda( ... treated_outcomes_ex, donor_outcomes_ex, num_pre_periods_lambda_ex, horizon_offset_ex, l2_penalty_ex ... ) >>> print(f"Lambda weights shape: {opt_lambda_weights.shape}") Lambda weights shape: (10,) >>> print(f"Sum of lambda weights: {np.sum(opt_lambda_weights):.2f}") Sum of lambda weights: 1.00 >>> print(f"Lambda intercept: {opt_lambda_intercept:.2f}") Lambda intercept: ...
- mlsynth.utils.helperutils.ssdid_w(treated_unit_outcomes_all_periods: ndarray, donor_units_outcomes_all_periods: ndarray, num_matching_pre_periods: int, matching_horizon_offset: int, l2_penalty_regularization_strength: float, donor_prior_weights_for_penalty: ndarray | None = None) Tuple[ndarray, float]#
Solve for synthetic control weights (omega) for the SSDID method.
This function estimates the donor weights (optimal_donor_weights) and an intercept term (optimal_intercept) that best match the pre-treatment trajectory of a treated unit. The optimization minimizes the sum of squared residuals between the treated unit’s outcomes and the weighted combination of donor outcomes, plus an L2 penalty on the donor weights. The weights are constrained to sum to 1.
The objective function is: min_{optimal_donor_weights, optimal_intercept} || donor_units_outcomes_matching_period @ optimal_donor_weights - optimal_intercept - treated_unit_outcomes_matching_period ||_2^2 + l2_penalty_regularization_strength^2 * sum(donor_prior_weights_for_penalty_j * optimal_donor_weights_j^2) subject to sum(optimal_donor_weights) == 1.
- Parameters:
treated_unit_outcomes_all_periods (np.ndarray) – Outcome vector for the treated unit across all time periods. Shape (T_total,), where T_total is the total number of time periods. (Formerly treated_y)
donor_units_outcomes_all_periods (np.ndarray) – Matrix of outcomes for donor units across all time periods. Shape (T_total, J), where J is the number of donor units. (Formerly donor_matrix)
num_matching_pre_periods (int) – Number of pre-treatment periods to use for matching the treated unit’s trajectory. The estimation uses data up to matching_period_end_index = num_matching_pre_periods + matching_horizon_offset. (Formerly a)
matching_horizon_offset (int) – Horizon parameter that defines the end of the matching period (matching_period_end_index). The outcomes from treated_unit_outcomes_all_periods[:matching_period_end_index] and donor_units_outcomes_all_periods[:matching_period_end_index, :] are used in the optimization. (Formerly k)
l2_penalty_regularization_strength (float) – Regularization parameter (non-negative) for the L2 penalty on the donor weights optimal_donor_weights. A larger l2_penalty_regularization_strength imposes a stronger penalty. (Formerly eta)
donor_prior_weights_for_penalty (Optional[np.ndarray], default None) – Prior weights for donor units, used in the L2 penalty term. Shape (J,). If None, uniform prior weights (1/J for each donor) are assumed. Default is None. (Formerly pi)
- Returns:
Tuple[np.ndarray, float] – A tuple containing:
optimal_donor_weights : np.ndarray The optimal donor weights, shape (J,). These weights sum to 1. (Formerly omega)
optimal_intercept : float The optimal intercept term. (Formerly omega_0)
- Raises:
MlsynthDataError – If input arrays have incorrect types, dimensions, or inconsistent shapes. If donor_prior_weights_for_penalty is provided and has an incorrect shape.
MlsynthConfigError – If l2_penalty_regularization_strength is negative. If num_matching_pre_periods or matching_horizon_offset are negative or lead to invalid slice indices.
MlsynthEstimationError – If the CVXPY optimization solver fails.
Notes
The optimization is performed using CVXPY with the default solver.
matching_period_end_index = num_matching_pre_periods + matching_horizon_offset defines the window of pre-treatment data used for estimation. (Formerly t_max = a + k)
Examples
>>> T_total_ex, J_ex = 20, 3 >>> num_matching_pre_periods_ex, matching_horizon_offset_ex, l2_penalty_ex = 10, 0, 0.1 >>> treated_outcomes_ex = np.random.rand(T_total_ex) >>> donor_outcomes_ex = np.random.rand(T_total_ex, J_ex) >>> opt_omega_vals, opt_omega_0_val = ssdid_w( ... treated_outcomes_ex, donor_outcomes_ex, num_matching_pre_periods_ex, matching_horizon_offset_ex, l2_penalty_ex ... ) >>> print(f"Omega weights shape: {opt_omega_vals.shape}") Omega weights shape: (3,) >>> print(f"Sum of omega weights: {np.sum(opt_omega_vals):.2f}") Sum of omega weights: 1.00 >>> print(f"Intercept omega_0: {opt_omega_0_val:.2f}") Intercept omega_0: ...