Source code for moderndid.drdid.estimators.ipw_did_rc

"""Inverse propensity weighted DiD estimator for repeated cross-sections data."""

import warnings
from typing import NamedTuple

import numpy as np
import statsmodels.api as sm
from scipy import stats

from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_logistic_irls

from ..bootstrap.boot_ipw_rc import wboot_ipw_rc
from ..bootstrap.boot_mult import mboot_did


class IPWDIDRCResult(NamedTuple):
    """Result from the IPW DiD RC estimator."""

    att: float
    se: float
    uci: float
    lci: float
    boots: np.ndarray | None
    att_inf_func: np.ndarray | None
    args: dict


[docs] def ipw_did_rc( y, post, d, covariates, i_weights=None, boot=False, boot_type="weighted", nboot=999, influence_func=False, trim_level=0.995, ): r"""Compute the inverse propensity weighted DiD estimator for the ATT with repeated cross-section data. Implements the inverse propensity weighted (IPW) estimator for the ATT with repeated cross-section data, as proposed by [1]_ and discussed in [2]_. The estimator is given by the sample analogue of equation (2.3) in [2]_ as .. math:: \tau = \frac{1}{\mathbb{E}[D]} \mathbb{E}\left[\frac{D-p(X)}{1-p(X)} \frac{T-\lambda}{\lambda(1-\lambda)} Y\right]. IPW weights are not normalized to sum up to one, that is, the estimator is of the Horwitz-Thompson type. Parameters ---------- y : ndarray A 1D array of outcomes from both pre- and post-treatment periods. post : ndarray A 1D array of post-treatment dummies (1 if post-treatment, 0 if pre-treatment). d : ndarray A 1D array of group indicators (1 if treated in post-treatment, 0 otherwise). covariates : ndarray or None A 2D array of covariates for propensity score estimation. An intercept must be included if desired. If None, leads to an unconditional DiD estimator. i_weights : ndarray, optional A 1D array of observation weights. If None, weights are uniform. Weights are normalized to have a mean of 1. boot : bool, default=False Whether to use bootstrap for inference. boot_type : {"weighted", "multiplier"}, default="weighted" Type of bootstrap to perform. nboot : int, default=999 Number of bootstrap repetitions. influence_func : bool, default=False Whether to return the influence function. trim_level : float, default=0.995 The trimming level for the propensity score. Returns ------- IPWDIDRCResult A NamedTuple containing the ATT estimate, standard error, confidence interval, bootstrap draws, and influence function. See Also -------- std_ipw_did_rc : Standardized version of Abadie's IPW DiD estimator for repeated cross-section data. References ---------- .. [1] Abadie, A. (2005). Semiparametric difference-in-differences estimators. The Review of Economic Studies, 72(1), 1-19. https://doi.org/10.1111/0034-6527.00321 .. [2] Sant'Anna, P. H., & Zhao, J. (2020). Doubly robust difference-in-differences estimators. Journal of Econometrics, 219(1), 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003 Notes ----- The IPW estimator is less robust than doubly robust methods as it relies solely on correct specification of the propensity score model. We recommend using doubly robust methods when there is uncertainty about model specification. """ y, post, d, covariates, i_weights, n_units = _validate_and_preprocess_inputs(y, post, d, covariates, i_weights) ps_fit, ps_weights = _compute_propensity_score(d, covariates, i_weights) trim_ps = np.ones(n_units, dtype=bool) trim_ps[d == 0] = ps_fit[d == 0] < trim_level weights = _compute_weights(d, post, ps_fit, i_weights, trim_ps) pi_hat = np.mean(i_weights * d) lambda_hat = np.mean(i_weights * post) one_minus_lambda_hat = np.mean(i_weights * (1 - post)) if pi_hat == 0: warnings.warn("No treated units after trimming.", UserWarning) return IPWDIDRCResult(att=np.nan, se=np.nan, uci=np.nan, lci=np.nan, boots=None, att_inf_func=None, args={}) if lambda_hat in (0, 1): warnings.warn(f"Lambda is {lambda_hat}, cannot compute IPW estimator.", UserWarning) return IPWDIDRCResult(att=np.nan, se=np.nan, uci=np.nan, lci=np.nan, boots=None, att_inf_func=None, args={}) influence_components = _get_influence_components(y, weights, pi_hat, lambda_hat, one_minus_lambda_hat) ipw_att = (influence_components["att_treat_post"] - influence_components["att_treat_pre"]) - ( influence_components["att_cont_post"] - influence_components["att_cont_pre"] ) influence_quantities = _get_influence_quantities(d, covariates, ps_fit, ps_weights, i_weights, n_units) att_inf_func = _compute_influence_function( post, d, covariates, i_weights, pi_hat, lambda_hat, one_minus_lambda_hat, influence_components, influence_quantities, ) # Inference if not boot: se_att = np.std(att_inf_func, ddof=1) * np.sqrt(n_units - 1) / n_units uci = ipw_att + 1.96 * se_att lci = ipw_att - 1.96 * se_att ipw_boot = None else: if boot_type == "multiplier": ipw_boot = mboot_did(att_inf_func, nboot) se_att = stats.iqr(ipw_boot, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) cv = np.nanquantile(np.abs(ipw_boot / se_att), 0.95) uci = ipw_att + cv * se_att lci = ipw_att - cv * se_att else: # "weighted" ipw_boot = wboot_ipw_rc( y=y, post=post, d=d, x=covariates, i_weights=i_weights, n_bootstrap=nboot, trim_level=trim_level ) se_att = stats.iqr(ipw_boot - ipw_att, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) cv = np.nanquantile(np.abs((ipw_boot - ipw_att) / se_att), 0.95) uci = ipw_att + cv * se_att lci = ipw_att - cv * se_att if not influence_func: att_inf_func = None args = { "panel": False, "normalized": False, "boot": boot, "boot_type": boot_type, "nboot": nboot, "type": "ipw", "trim_level": trim_level, } return IPWDIDRCResult( att=ipw_att, se=se_att, uci=uci, lci=lci, boots=ipw_boot, att_inf_func=att_inf_func, args=args, )
def _validate_and_preprocess_inputs(y, post, d, covariates, i_weights): """Validate and preprocess input arrays.""" d = np.asarray(d).flatten() n_units = len(d) y = np.asarray(y).flatten() post = np.asarray(post).flatten() covariates = np.ones((n_units, 1)) if covariates is None else np.asarray(covariates) if i_weights is None: i_weights = np.ones(n_units) else: i_weights = np.asarray(i_weights).flatten() if np.any(i_weights < 0): raise ValueError("i_weights must be non-negative.") i_weights = i_weights / np.mean(i_weights) if not np.any(d == 1): raise ValueError("No treated units found. Cannot estimate treatment effect.") if not np.any(d == 0): raise ValueError("No control units found. Cannot estimate treatment effect.") if not np.any(post == 1): raise ValueError("No post-treatment observations found.") if not np.any(post == 0): raise ValueError("No pre-treatment observations found.") return y, post, d, covariates, i_weights, n_units def _compute_propensity_score(d, covariates, i_weights): """Compute propensity score using logistic regression.""" xp = get_backend() if xp is not np: try: beta, ps_fit = cupy_logistic_irls( xp.asarray(d, dtype=xp.float64), xp.asarray(covariates, dtype=xp.float64), xp.asarray(i_weights, dtype=xp.float64), ) ps_fit = to_numpy(ps_fit) if np.any(np.isnan(to_numpy(beta))): raise ValueError( "Propensity score model coefficients have NA components. \n" "Multicollinearity (or lack of variation) of covariates is a likely reason." ) except (np.linalg.LinAlgError, RuntimeError) as e: raise ValueError("Failed to estimate propensity scores due to singular matrix.") from e else: try: pscore_model = sm.GLM(d, covariates, family=sm.families.Binomial(), freq_weights=i_weights) pscore_results = pscore_model.fit() if not pscore_results.converged: warnings.warn("GLM algorithm did not converge.", UserWarning) if np.any(np.isnan(pscore_results.params)): raise ValueError( "Propensity score model coefficients have NA components. \n" "Multicollinearity (or lack of variation) of covariates is a likely reason." ) ps_fit = pscore_results.predict(covariates) except np.linalg.LinAlgError as e: raise ValueError("Failed to estimate propensity scores due to singular matrix.") from e ps_fit = np.clip(ps_fit, 1e-6, 1 - 1e-6) ps_weights = ps_fit * (1 - ps_fit) * i_weights return ps_fit, ps_weights def _compute_weights(d, post, ps_fit, i_weights, trim_ps): """Compute IPW weights.""" w_treat_pre = i_weights * d * (1 - post) w_treat_post = i_weights * d * post w_cont_pre = trim_ps * i_weights * ps_fit * (1 - d) * (1 - post) / (1 - ps_fit) w_cont_post = trim_ps * i_weights * ps_fit * (1 - d) * post / (1 - ps_fit) return { "w_treat_pre": w_treat_pre, "w_treat_post": w_treat_post, "w_cont_pre": w_cont_pre, "w_cont_post": w_cont_post, } def _get_influence_components(y, weights, pi_hat, lambda_hat, one_minus_lambda_hat): """Compute influence function components.""" w_treat_pre = weights["w_treat_pre"] w_treat_post = weights["w_treat_post"] w_cont_pre = weights["w_cont_pre"] w_cont_post = weights["w_cont_post"] # Elements of the influence function (summands) eta_treat_pre = w_treat_pre * y / (pi_hat * one_minus_lambda_hat) eta_treat_post = w_treat_post * y / (pi_hat * lambda_hat) eta_cont_pre = w_cont_pre * y / (pi_hat * one_minus_lambda_hat) eta_cont_post = w_cont_post * y / (pi_hat * lambda_hat) # Estimator of each component att_treat_pre = np.mean(eta_treat_pre) att_treat_post = np.mean(eta_treat_post) att_cont_pre = np.mean(eta_cont_pre) att_cont_post = np.mean(eta_cont_post) return { "eta_treat_pre": eta_treat_pre, "eta_treat_post": eta_treat_post, "eta_cont_pre": eta_cont_pre, "eta_cont_post": eta_cont_post, "att_treat_pre": att_treat_pre, "att_treat_post": att_treat_post, "att_cont_pre": att_cont_pre, "att_cont_post": att_cont_post, } def _get_influence_quantities(d, covariates, ps_fit, ps_weights, i_weights, n_units): """Compute quantities needed for influence function.""" # Asymptotic linear representation of logit's beta's score_ps = (i_weights * (d - ps_fit))[:, np.newaxis] * covariates try: hessian_ps = np.linalg.inv(covariates.T @ (ps_weights[:, np.newaxis] * covariates)) * n_units except np.linalg.LinAlgError: warnings.warn("Failed to invert Hessian matrix. Using pseudo-inverse.", UserWarning) hessian_ps = np.linalg.pinv(covariates.T @ (ps_weights[:, np.newaxis] * covariates)) * n_units asy_lin_rep_ps = score_ps @ hessian_ps return { "asy_lin_rep_ps": asy_lin_rep_ps, } def _compute_influence_function( post, d, covariates, i_weights, pi_hat, lambda_hat, one_minus_lambda_hat, influence_components, influence_quantities ): """Compute the influence function for IPW estimator.""" # Extract components eta_treat_pre = influence_components["eta_treat_pre"] eta_treat_post = influence_components["eta_treat_post"] eta_cont_pre = influence_components["eta_cont_pre"] eta_cont_post = influence_components["eta_cont_post"] att_treat_pre = influence_components["att_treat_pre"] att_treat_post = influence_components["att_treat_post"] att_cont_pre = influence_components["att_cont_pre"] att_cont_post = influence_components["att_cont_post"] asy_lin_rep_ps = influence_quantities["asy_lin_rep_ps"] # Influence function of the treated components inf_treat_post1 = eta_treat_post - att_treat_post inf_treat_post2 = -(i_weights * d - pi_hat) * att_treat_post / pi_hat inf_treat_post3 = -(i_weights * post - lambda_hat) * att_treat_post / lambda_hat inf_treat_post = inf_treat_post1 + inf_treat_post2 + inf_treat_post3 inf_treat_pre1 = eta_treat_pre - att_treat_pre inf_treat_pre2 = -(i_weights * d - pi_hat) * att_treat_pre / pi_hat inf_treat_pre3 = -(i_weights * (1 - post) - one_minus_lambda_hat) * att_treat_pre / one_minus_lambda_hat inf_treat_pre = inf_treat_pre1 + inf_treat_pre2 + inf_treat_pre3 # Influence function of control components inf_cont_post1 = eta_cont_post - att_cont_post inf_cont_post2 = -(i_weights * d - pi_hat) * att_cont_post / pi_hat inf_cont_post3 = -(i_weights * post - lambda_hat) * att_cont_post / lambda_hat inf_cont_post = inf_cont_post1 + inf_cont_post2 + inf_cont_post3 inf_cont_pre1 = eta_cont_pre - att_cont_pre inf_cont_pre2 = -(i_weights * d - pi_hat) * att_cont_pre / pi_hat inf_cont_pre3 = -(i_weights * (1 - post) - one_minus_lambda_hat) * att_cont_pre / one_minus_lambda_hat inf_cont_pre = inf_cont_pre1 + inf_cont_pre2 + inf_cont_pre3 # Estimation effect from the propensity score parameters # Derivative matrix (k x 1 vector) mom_logit_pre = -np.mean((eta_cont_pre)[:, np.newaxis] * covariates, axis=0) mom_logit_post = -np.mean((eta_cont_post)[:, np.newaxis] * covariates, axis=0) # Now the influence function related to estimation effect of pscores inf_logit = asy_lin_rep_ps @ (mom_logit_post - mom_logit_pre) # Get the influence function of the IPW estimator (put all pieces together) att_inf_func = (inf_treat_post - inf_treat_pre) - (inf_cont_post - inf_cont_pre) + inf_logit return att_inf_func