Source code for moderndid.didtriple.estimators.ddd_panel

"""Doubly robust DDD estimator for 2-period panel data."""

from __future__ import annotations

import warnings

import numpy as np
from scipy import stats

from moderndid.cupy.backend import get_backend, to_numpy

from ..bootstrap.mboot_ddd import mboot_ddd, wboot_ddd
from ..container import DDDPanelResult
from ..nuisance import compute_all_did, compute_all_nuisances


[docs] def ddd_panel( y1, y0, subgroup, covariates, i_weights=None, est_method="dr", boot=False, boot_type="multiplier", biters=1000, influence_func=False, alpha=0.05, random_state=None, ): r"""Compute the 2-period doubly robust DDD estimator for the ATT with panel data. Implements the triple difference-in-differences estimator from [1]_. The DDD design exploits three dimensions of variation: treatment status :math:`S` (treated vs untreated groups), eligibility :math:`Q` (eligible vs ineligible for treatment), and time (pre vs post-treatment periods). This allows for weaker identification assumptions than standard DiD by differencing out both treatment-specific and eligibility-specific heterogeneous trends. The target parameter is the Average Treatment Effect on the Treated (ATT) .. math:: ATT(2, 2) = \mathbb{E}[Y_2(2) - Y_2(\infty) \mid S=2, Q=1], where :math:`S=2` denotes units in the treatment-enabling group and :math:`Q=1` denotes eligibility for treatment. The doubly robust DDD estimand (Equation 3.5 from [1]_) combines three DiD comparisons .. math:: \widehat{ATT}_{\mathrm{dr}}(2,2) &= \mathbb{E}_n\left[ \left(\widehat{w}_{\mathrm{trt}}^{S=2,Q=1}(S,Q) - \widehat{w}_{\mathrm{comp}}^{S=2,Q=0}(S,Q,X)\right) \left(Y_2 - Y_1 - \widehat{m}_{Y_2-Y_1}^{S=2,Q=0}(X)\right)\right] \\ &+ \mathbb{E}_n\left[ \left(\widehat{w}_{\mathrm{trt}}^{S=2,Q=1}(S,Q) - \widehat{w}_{\mathrm{comp}}^{S=\infty,Q=1}(S,Q,X)\right) \left(Y_2 - Y_1 - \widehat{m}_{Y_2-Y_1}^{S=\infty,Q=1}(X)\right)\right] \\ &- \mathbb{E}_n\left[ \left(\widehat{w}_{\mathrm{trt}}^{S=2,Q=1}(S,Q) - \widehat{w}_{\mathrm{comp}}^{S=\infty,Q=0}(S,Q,X)\right) \left(Y_2 - Y_1 - \widehat{m}_{Y_2-Y_1}^{S=\infty,Q=0}(X)\right)\right], where the estimated weights are .. math:: \widehat{w}_{\mathrm{trt}}^{S=2,Q=1}(S,Q) \equiv \frac{\mathbf{1}\{S=2, Q=1\}}{\mathbb{E}_n[\mathbf{1}\{S=2, Q=1\}]}, \quad \widehat{w}_{\mathrm{comp}}^{S=g,Q=q}(S,Q,X) \equiv \frac{\frac{\mathbf{1}\{S=g, Q=q\} \cdot \widehat{p}^{S=2,Q=1}(X)} {\widehat{p}^{S=g,Q=q}(X)}} {\mathbb{E}_n\left[\frac{\mathbf{1}\{S=g, Q=q\} \cdot \widehat{p}^{S=2,Q=1}(X)} {\widehat{p}^{S=g,Q=q}(X)}\right]}. Parameters ---------- y1 : ndarray A 1D array of outcomes from the post-treatment period :math:`Y_t`. y0 : ndarray A 1D array of outcomes from the pre-treatment period :math:`Y_{g-1}`. subgroup : ndarray A 1D array of subgroup indicators (1, 2, 3, or 4) for each unit, corresponding to the four cells of the :math:`S \times Q` partition: - 4: :math:`S=g, Q=1` (Treated AND Eligible - target group) - 3: :math:`S=g, Q=0` (Treated BUT Ineligible) - 2: :math:`S=g_c, Q=1` (Eligible BUT Untreated) - 1: :math:`S=g_c, Q=0` (Untreated AND Ineligible) covariates : ndarray A 2D array of pre-treatment covariates :math:`X` for propensity score and outcome regression models. An intercept must be included if desired. i_weights : ndarray, optional A 1D array of observation weights. If None, weights are uniform. Weights are normalized to have a mean of 1. est_method : {"dr", "reg", "ipw"}, default "dr" Estimation method for each 2-period comparison. boot : bool, default False Whether to use bootstrap for inference. boot_type : {"multiplier", "weighted"}, default "multiplier" Type of bootstrap. Multiplier bootstrap uses Rademacher weights on the influence function; weighted bootstrap re-estimates with exponential weights. biters : int, default 1000 Number of bootstrap repetitions. influence_func : bool, default False Whether to return the influence function. alpha : float, default 0.05 Significance level for confidence intervals. random_state : int, Generator, or None, default None Controls random number generation for bootstrap reproducibility. Returns ------- DDDPanelResult A NamedTuple containing: - att: The DDD point estimate - se: Standard error - uci, lci: Confidence interval bounds - boots: Bootstrap draws (if requested) - att_inf_func: Influence function (if requested) - did_atts: Individual DiD ATT estimates for each comparison - subgroup_counts: Number of units in each subgroup - args: Estimation arguments See Also -------- ddd_mp : Multi-period DDD estimator for staggered adoption. Notes ----- The DDD estimator identifies the ATT under the DDD Conditional Parallel Trends assumption (DDD-CPT) from [1]_, which requires that, conditional on covariates :math:`X`, the difference in outcome trends between eligible and ineligible units is the same across treatment-enabling groups .. math:: &\mathbb{E}[Y_t(\infty) - Y_{t-1}(\infty) \mid S=g, Q=1, X] - \mathbb{E}[Y_t(\infty) - Y_{t-1}(\infty) \mid S=g, Q=0, X] \\ &= \mathbb{E}[Y_t(\infty) - Y_{t-1}(\infty) \mid S=g', Q=1, X] - \mathbb{E}[Y_t(\infty) - Y_{t-1}(\infty) \mid S=g', Q=0, X]. The DR estimator is consistent if, for each of the three DiD components, either the propensity score model or the outcome regression model is correctly specified. References ---------- .. [1] Ortiz-Villavicencio, M., & Sant'Anna, P. H. C. (2025). *Better Understanding Triple Differences Estimators.* arXiv preprint arXiv:2505.09942. https://arxiv.org/abs/2505.09942 """ xp = get_backend() y1, y0, subgroup, covariates, i_weights, n_units = _validate_inputs(xp, y1, y0, subgroup, covariates, i_weights) subgroup_counts = { "subgroup_1": int(xp.sum(subgroup == 1)), "subgroup_2": int(xp.sum(subgroup == 2)), "subgroup_3": int(xp.sum(subgroup == 3)), "subgroup_4": int(xp.sum(subgroup == 4)), } pscores, or_results = compute_all_nuisances( y1=y1, y0=y0, subgroup=subgroup, covariates=covariates, weights=i_weights, est_method=est_method, ) did_results, ddd_att, inf_func = compute_all_did( subgroup=subgroup, covariates=covariates, weights=i_weights, pscores=pscores, or_results=or_results, est_method=est_method, n_total=n_units, ) did_atts = { "att_4v3": did_results[0].dr_att, "att_4v2": did_results[1].dr_att, "att_4v1": did_results[2].dr_att, } inf_func = to_numpy(inf_func) ddd_att = float(ddd_att) dr_boot = None z_val = stats.norm.ppf(1 - alpha / 2) if not boot: se_ddd = np.std(inf_func, ddof=1) / np.sqrt(n_units) uci = ddd_att + z_val * se_ddd lci = ddd_att - z_val * se_ddd else: if boot_type == "multiplier": boot_result = mboot_ddd(inf_func, biters, alpha, random_state=random_state) dr_boot = boot_result.bres.flatten() se_ddd = boot_result.se[0] cv = boot_result.crit_val if np.isfinite(boot_result.crit_val) else z_val if np.isfinite(se_ddd) and se_ddd > 0: uci = ddd_att + cv * se_ddd lci = ddd_att - cv * se_ddd else: uci = lci = ddd_att warnings.warn("Bootstrap standard error is zero or NaN.", UserWarning) else: dr_boot = wboot_ddd( y1=y1, y0=y0, subgroup=subgroup, covariates=covariates, i_weights=i_weights, est_method=est_method, biters=biters, random_state=random_state, ) se_ddd = stats.iqr(dr_boot - ddd_att, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) if se_ddd > 0: cv = np.nanquantile(np.abs((dr_boot - ddd_att) / se_ddd), 1 - alpha) uci = ddd_att + cv * se_ddd lci = ddd_att - cv * se_ddd else: uci = lci = ddd_att warnings.warn("Bootstrap standard error is zero.", UserWarning) if not influence_func: inf_func = None args = { "panel": True, "est_method": est_method, "boot": boot, "boot_type": boot_type, "biters": biters, "alpha": alpha, } return DDDPanelResult( att=ddd_att, se=se_ddd, uci=uci, lci=lci, boots=dr_boot, att_inf_func=inf_func, did_atts=did_atts, subgroup_counts=subgroup_counts, args=args, )
def _validate_inputs(xp, y1, y0, subgroup, covariates, i_weights): """Validate and preprocess input arrays.""" y1 = xp.asarray(y1).flatten() y0 = xp.asarray(y0).flatten() subgroup = xp.asarray(subgroup).flatten() n_units = len(subgroup) if len(y1) != n_units or len(y0) != n_units: raise ValueError("y1, y0, and subgroup must have the same length.") if covariates is None: covariates = xp.ones((n_units, 1)) else: covariates = xp.asarray(covariates) if covariates.ndim == 1: covariates = covariates.reshape(-1, 1) if covariates.shape[0] != n_units: raise ValueError("covariates must have the same number of rows as subgroup.") if i_weights is None: i_weights = xp.ones(n_units) else: i_weights = xp.asarray(i_weights).flatten() if len(i_weights) != n_units: raise ValueError("i_weights must have the same length as subgroup.") if xp.any(i_weights < 0): raise ValueError("i_weights must be non-negative.") i_weights = i_weights / xp.mean(i_weights) unique_subgroups = set(int(v) for v in to_numpy(xp.unique(subgroup))) expected_subgroups = {1, 2, 3, 4} if not unique_subgroups.issubset(expected_subgroups): raise ValueError(f"subgroup must contain only values 1, 2, 3, 4. Got {unique_subgroups}.") if 4 not in unique_subgroups: raise ValueError("subgroup must contain at least one unit in subgroup 4 (treated-eligible).") for sg in [1, 2, 3]: if sg not in unique_subgroups: warnings.warn( f"No units in subgroup {sg}. DDD estimate may be unreliable.", UserWarning, ) return y1, y0, subgroup, covariates, i_weights, n_units