Source code for moderndid.drdid.estimators.drdid_imp_panel

"""Improved and locally efficient doubly robust DiD estimator for panel data."""

import warnings
from typing import NamedTuple

import numpy as np
from scipy import stats

from moderndid.cupy.backend import get_backend, to_numpy

from ..bootstrap.boot_mult import mboot_did
from ..bootstrap.boot_panel import wboot_drdid_imp_panel
from ..propensity.pscore_ipt import calculate_pscore_ipt
from .wols import wols_panel


class DRDIDPanelResult(NamedTuple):
    """Result from the drdid Panel estimator."""

    att: float
    se: float
    uci: float
    lci: float
    boots: np.ndarray | None
    att_inf_func: np.ndarray | None
    args: dict


[docs] def drdid_imp_panel( y1, y0, d, covariates, i_weights=None, boot=False, boot_type="weighted", nboot=999, influence_func=False, trim_level=0.995, ): r"""Compute the improved and locally efficient DR-DiD estimator for the ATT with panel data. Implements the locally efficient and improved doubly robust DiD estimator for the ATT with panel data, as described in [2]_. The estimator is given by .. math:: \widehat{\tau}_{imp}^{dr, p} = \mathbb{E}_{n}\left[\left(\widehat{w}_{1}^{p}(D) - \widehat{w}_{0}^{p}(D, X ; \widehat{\gamma}^{ipt})\right) \left(\Delta Y - \mu_{0, \Delta}^{lin, p}(X ; \widehat{\beta}_{0, \Delta}^{wls, p})\right)\right]. The estimator uses a logistic propensity score model estimated via inverse probability tilting as described in [1]_, and a linear regression model for the outcome evolution of control units (estimated via weighted least squares). Parameters ---------- y1 : ndarray A 1D array of outcomes from the post-treatment period. y0 : ndarray A 1D array of outcomes from the pre-treatment period. d : ndarray A 1D array of group indicators (=1 if treated, =0 otherwise). covariates : ndarray A 2D array of covariates for propensity score and outcome regression. An intercept must be included if desired. i_weights : ndarray, optional A 1D array of observation weights. If None, weights are uniform. Weights are normalized to have a mean of 1. boot : bool, default=False Whether to use bootstrap for inference. boot_type : {"weighted", "multiplier"}, default="weighted" Type of bootstrap to perform. nboot : int, default=999 Number of bootstrap repetitions. influence_func : bool, default=False Whether to return the influence function. trim_level : float, default=0.995 The trimming level for the propensity score. Returns ------- DRDIDPanelResult A NamedTuple containing the ATT estimate, standard error, confidence interval, bootstrap draws, propensity score flag, and influence function. Notes ----- The nuisance parameters are estimated as described in Section 3.1 of [2]_. The propensity score parameters are estimated using the inverse probability tilting estimator from [1]_ .. math:: \widehat{\gamma}^{ipt} = \arg\max_{\gamma \in \Gamma} \mathbb{E}_{n} \left[D X^{\prime} \gamma - (1-D) \exp(X^{\prime} \gamma)\right] and the outcome regression coefficients are estimated using weighted least squares .. math:: \widehat{\beta}_{0, \Delta}^{wls, p} = \arg\min_{b \in \Theta} \mathbb{E}_{n} \left[\left.\frac{\Lambda(X^{\prime} \hat{\gamma}^{ipt})}{1-\Lambda(X^{\prime} \hat{\gamma}^{ipt})} (\Delta Y - X^{\prime} b)^{2} \right\rvert\, D=0\right]. The resulting estimator is not only locally efficient and doubly robust for the ATT, but it is also doubly robust for inference. See Also -------- drdid_panel : Locally efficient doubly robust DiD estimator for the ATT with panel data. References ---------- .. [1] Graham, B. S., Pinto, C. C., & Egel, D. (2012). *Inverse probability tilting for moment condition models with missing data.* The Review of Economic Studies, 79(3), 1053-1079. https://doi.org/10.1093/restud/rdr047 .. [2] Sant'Anna, P. H., & Zhao, J. (2020). *Doubly robust difference-in-differences estimators.* Journal of Econometrics, 219(1), 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003 arXiv preprint: https://arxiv.org/abs/1812.01723 """ xp = get_backend() y1, y0, d, covariates, i_weights, n_units = _validate_and_preprocess_inputs(xp, y1, y0, d, covariates, i_weights) delta_y = y1 - y0 pscore_ipt_results = calculate_pscore_ipt(D=d, X=covariates, iw=i_weights) ps_fit = xp.clip(xp.asarray(pscore_ipt_results), 1e-6, 1 - 1e-6) trim_ps = xp.ones(n_units, dtype=bool) trim_ps[d == 0] = ps_fit[d == 0] < trim_level outcome_reg = wols_panel(delta_y=delta_y, d=d, x=covariates, ps=ps_fit, i_weights=i_weights) out_delta = outcome_reg.out_reg dr_att_summand_num = trim_ps * (1 - (1 - d) / (1 - ps_fit)) * (delta_y - out_delta) dr_att = xp.mean(i_weights * dr_att_summand_num) / xp.mean(d * i_weights) mean_d_weights = xp.mean(d * i_weights) att_inf_func = i_weights * trim_ps * (dr_att_summand_num - d * dr_att) / mean_d_weights att_inf_func = to_numpy(att_inf_func) dr_att = float(dr_att) dr_boot = None if not boot: se_dr_att = np.std(att_inf_func, ddof=1) * np.sqrt(n_units - 1) / n_units uci = dr_att + 1.96 * se_dr_att lci = dr_att - 1.96 * se_dr_att else: if nboot is None: nboot = 999 if boot_type == "multiplier": dr_boot = mboot_did(att_inf_func, nboot) se_dr_att = stats.iqr(dr_boot, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) if se_dr_att > 0: cv = np.nanquantile(np.abs(dr_boot / se_dr_att), 0.95) uci = dr_att + cv * se_dr_att lci = dr_att - cv * se_dr_att else: uci = lci = dr_att warnings.warn("Bootstrap standard error is zero.", UserWarning) else: # "weighted" dr_boot = wboot_drdid_imp_panel( delta_y=delta_y, d=d, x=covariates, i_weights=i_weights, n_bootstrap=nboot, trim_level=trim_level ) se_dr_att = stats.iqr(dr_boot - dr_att, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) if se_dr_att > 0: cv = np.nanquantile(np.abs((dr_boot - dr_att) / se_dr_att), 0.95) uci = dr_att + cv * se_dr_att lci = dr_att - cv * se_dr_att else: uci = lci = dr_att warnings.warn("Bootstrap standard error is zero.", UserWarning) if not influence_func: att_inf_func = None args = { "panel": True, "estMethod": "imp", "boot": boot, "boot.type": boot_type, "nboot": nboot, "type": "dr", "trim.level": trim_level, } return DRDIDPanelResult( att=dr_att, se=se_dr_att, uci=uci, lci=lci, boots=dr_boot, att_inf_func=att_inf_func, args=args, )
def _validate_and_preprocess_inputs( xp, y1, y0, d, covariates, i_weights, ): """Validate and preprocess input arrays.""" d = xp.asarray(d).flatten() n_units = len(d) y1 = xp.asarray(y1).flatten() y0 = xp.asarray(y0).flatten() covariates = xp.ones((n_units, 1)) if covariates is None else xp.asarray(covariates) if i_weights is None: i_weights = xp.ones(n_units) else: i_weights = xp.asarray(i_weights).flatten() if xp.any(i_weights < 0): raise ValueError("i_weights must be non-negative.") i_weights /= xp.mean(i_weights) return y1, y0, d, covariates, i_weights, n_units