Source code for moderndid.drdid.estimators.ipw_did_panel

"""Inverse propensity weighted DiD estimator for panel data."""

import warnings
from typing import NamedTuple

import numpy as np
import statsmodels.api as sm
from scipy import stats

from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_logistic_irls

from ..bootstrap.boot_mult import mboot_did
from ..bootstrap.boot_panel import wboot_ipw_panel


class IPWDIDPanelResult(NamedTuple):
    """Result from the IPW DiD Panel estimator."""

    att: float
    se: float
    uci: float
    lci: float
    boots: np.ndarray | None
    att_inf_func: np.ndarray | None
    args: dict


[docs] def ipw_did_panel( y1, y0, d, covariates, i_weights=None, boot=False, boot_type="weighted", nboot=999, influence_func=False, trim_level=0.995, ): r"""Compute the inverse propensity weighted DiD estimator for the ATT with panel data. Implements the inverse propensity weighted (IPW) estimator for the ATT with panel data, as proposed by [1]_ and discussed in [2]_. The estimator is given by equation (2.4) in [2]_ as .. math:: \widehat{\tau}^{ipw,p} = \frac{1}{\mathbb{E}_{n}[D]} \mathbb{E}_{n} \left[\frac{D-\widehat{\pi}(X)}{1-\widehat{\pi}(X)}\left(Y_{1}-Y_{0}\right)\right]. IPW weights are not normalized to sum up to one, that is, the estimator is of the Horwitz-Thompson type. Parameters ---------- y1 : ndarray A 1D array of outcomes from the post-treatment period. y0 : ndarray A 1D array of outcomes from the pre-treatment period. d : ndarray A 1D array of group indicators (1 if treated in post-treatment, 0 otherwise). covariates : ndarray or None A 2D array of covariates for propensity score estimation. An intercept must be included if desired. If None, leads to an unconditional DiD estimator. i_weights : ndarray, optional A 1D array of observation weights. If None, weights are uniform. Weights are normalized to have a mean of 1. boot : bool, default=False Whether to use bootstrap for inference. boot_type : {"weighted", "multiplier"}, default="weighted" Type of bootstrap to perform. nboot : int, default=999 Number of bootstrap repetitions. influence_func : bool, default=False Whether to return the influence function. trim_level : float, default=0.995 The trimming level for the propensity score. Returns ------- IPWDIDPanelResult A NamedTuple containing the ATT estimate, standard error, confidence interval, bootstrap draws, and influence function. See Also -------- ipw_did_rc : IPW DiD estimator for repeated cross-section data. std_ipw_did_panel : Standardized version of Abadie's IPW DiD estimator for panel data. References ---------- .. [1] Abadie, A. (2005). Semiparametric difference-in-differences estimators. The Review of Economic Studies, 72(1), 1-19. https://doi.org/10.1111/0034-6527.00321 .. [2] Sant'Anna, P. H., & Zhao, J. (2020). Doubly robust difference-in-differences estimators. Journal of Econometrics, 219(1), 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003 Notes ----- The IPW estimator is less robust than doubly robust methods as it relies solely on correct specification of the propensity score model. We recommend using doubly robust methods when there is uncertainty about model specification. """ y1, y0, d, covariates, i_weights, n_units, delta_y = _validate_and_preprocess_inputs( y1, y0, d, covariates, i_weights ) ps_fit, W, ps_results = _compute_propensity_score(d, covariates, i_weights) trim_ps = ps_fit < 1.01 # This effectively creates all True for treated units trim_ps[d == 0] = ps_fit[d == 0] < trim_level weights = _compute_weights(d, ps_fit, i_weights, trim_ps) att_treat = weights["w_treat"] * delta_y att_cont = weights["w_cont"] * delta_y mean_trim_weight_d = np.mean(i_weights * d) if mean_trim_weight_d == 0: warnings.warn("No effectively treated units after trimming.", UserWarning) return IPWDIDPanelResult(att=np.nan, se=np.nan, uci=np.nan, lci=np.nan, boots=None, att_inf_func=None, args={}) eta_treat = np.mean(att_treat) / mean_trim_weight_d eta_cont = np.mean(att_cont) / mean_trim_weight_d ipw_att = eta_treat - eta_cont influence_quantities = _get_influence_quantities(d, covariates, ps_fit, i_weights, W, ps_results, n_units) att_inf_func = _compute_influence_function( att_treat, att_cont, d, covariates, i_weights, ipw_att, influence_quantities ) # Inference if not boot: se_att = np.std(att_inf_func, ddof=1) * np.sqrt(n_units - 1) / n_units uci = ipw_att + 1.96 * se_att lci = ipw_att - 1.96 * se_att ipw_boot = None else: if nboot is None: nboot = 999 if boot_type == "multiplier": ipw_boot = mboot_did(att_inf_func, nboot) se_att = stats.iqr(ipw_boot, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) cv = np.nanquantile(np.abs(ipw_boot / se_att), 0.95) uci = ipw_att + cv * se_att lci = ipw_att - cv * se_att else: # "weighted" ipw_boot = wboot_ipw_panel( delta_y=delta_y, d=d, x=covariates, i_weights=i_weights, n_bootstrap=nboot, trim_level=trim_level ) se_att = stats.iqr(ipw_boot - ipw_att, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) cv = np.nanquantile(np.abs((ipw_boot - ipw_att) / se_att), 0.95) uci = ipw_att + cv * se_att lci = ipw_att - cv * se_att if not influence_func: att_inf_func = None boot_type_str = "multiplier" if boot_type == "multiplier" else "weighted" args = { "panel": True, "normalized": False, "boot": boot, "boot_type": boot_type_str, "nboot": nboot, "type": "ipw", "trim_level": trim_level, } return IPWDIDPanelResult( att=ipw_att, se=se_att, uci=uci, lci=lci, boots=ipw_boot, att_inf_func=att_inf_func, args=args, )
def _validate_and_preprocess_inputs(y1, y0, d, covariates, i_weights): """Validate and preprocess input arrays.""" d = np.asarray(d).flatten() n_units = len(d) delta_y = np.asarray(y1).flatten() - np.asarray(y0).flatten() covariates = np.ones((n_units, 1)) if covariates is None else np.asarray(covariates) # Weights if i_weights is None: i_weights = np.ones(n_units) else: i_weights = np.asarray(i_weights).flatten() if np.any(i_weights < 0): raise ValueError("i_weights must be non-negative.") i_weights = i_weights / np.mean(i_weights) # Check if we have variation in treatment unique_d = np.unique(d) if len(unique_d) < 2: if unique_d[0] == 0: raise ValueError("No treated units found. Cannot estimate treatment effect.") raise ValueError("No control units found. Cannot estimate treatment effect.") return y1, y0, d, covariates, i_weights, n_units, delta_y def _compute_propensity_score(d, covariates, i_weights): """Compute propensity score using logistic regression.""" xp = get_backend() ps_results = None if xp is not np: try: beta, ps_fit = cupy_logistic_irls( xp.asarray(d, dtype=xp.float64), xp.asarray(covariates, dtype=xp.float64), xp.asarray(i_weights, dtype=xp.float64), ) ps_fit = to_numpy(ps_fit) if np.any(np.isnan(to_numpy(beta))): raise ValueError( "Propensity score model coefficients have NA components. \n" "Multicollinearity (or lack of variation) of covariates is a likely reason." ) except (np.linalg.LinAlgError, RuntimeError) as e: raise ValueError("Failed to estimate propensity scores due to singular matrix.") from e else: try: ps_model = sm.GLM(d, covariates, family=sm.families.Binomial(), freq_weights=i_weights) ps_results = ps_model.fit() if not ps_results.converged: warnings.warn("Propensity score estimation did not converge.", UserWarning) if np.any(np.isnan(ps_results.params)): raise ValueError( "Propensity score model coefficients have NA components. \n" "Multicollinearity (or lack of variation) of covariates is a likely reason." ) ps_fit = ps_results.predict(covariates) except np.linalg.LinAlgError as e: raise ValueError("Failed to estimate propensity scores due to singular matrix.") from e ps_fit = np.clip(ps_fit, 1e-6, 1 - 1e-6) W = ps_fit * (1 - ps_fit) * i_weights return ps_fit, W, ps_results def _compute_weights(d, ps_fit, i_weights, trim_ps): """Compute IPW weights.""" w_treat = trim_ps * i_weights * d w_cont = trim_ps * i_weights * ps_fit * (1 - d) / (1 - ps_fit) return { "w_treat": w_treat, "w_cont": w_cont, } def _get_influence_quantities(d, covariates, ps_fit, i_weights, W, ps_results, n_units): """Compute quantities needed for influence function.""" # Asymptotic linear representation of logit's beta's score_ps = (i_weights * (d - ps_fit))[:, np.newaxis] * covariates try: weighted_cov_matrix = covariates.T @ (W[:, np.newaxis] * covariates) hessian_ps = np.linalg.inv(weighted_cov_matrix) * n_units except np.linalg.LinAlgError: hessian_ps = ps_results.cov_params() * n_units asy_lin_rep_ps = score_ps @ hessian_ps return { "asy_lin_rep_ps": asy_lin_rep_ps, } def _compute_influence_function(att_treat, att_cont, d, covariates, i_weights, ipw_att, influence_quantities): """Compute the influence function for IPW estimator.""" asy_lin_rep_ps = influence_quantities["asy_lin_rep_ps"] # Leading term of the influence function: no estimation effect att_lin1 = att_treat - att_cont # Derivative matrix (k x 1 vector) mom_logit = np.mean(att_cont[:, np.newaxis] * covariates, axis=0) # Now the influence function related to estimation effect of pscores att_lin2 = asy_lin_rep_ps @ mom_logit # Get the influence function of the IPW estimator (put all pieces together) mean_weight_d = np.mean(i_weights * d) att_inf_func = (att_lin1 - att_lin2 - i_weights * d * ipw_att) / mean_weight_d return att_inf_func