Source code for moderndid.drdid.estimators.twfe_did_panel

"""Two-way fixed effects DiD estimator for panel data."""

from typing import NamedTuple

import numpy as np
import polars as pl
import statsmodels.api as sm
from scipy import stats

from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_wls

from ..bootstrap.boot_mult import mboot_twfep_did
from ..bootstrap.boot_panel import wboot_twfe_panel
from ..ordid import ordid


class TWFEDIDPanelResult(NamedTuple):
    """Result from the two-way fixed effects DiD Panel estimator."""

    att: float
    se: float
    uci: float
    lci: float
    boots: np.ndarray | None
    att_inf_func: np.ndarray | None
    args: dict


[docs] def twfe_did_panel( y1, y0, d, covariates=None, i_weights=None, boot=False, boot_type="weighted", nboot=999, influence_func=False, ): r"""Compute linear two-way fixed effects DiD estimator for the ATT with panel data. Implements the linear two-way fixed effects (TWFE) estimator for the ATT with panel data, as illustrated in [1]_. The estimator is based on the regression model from equation (2.5) of [1]_ as .. math:: Y_{it} = \alpha_1 + \alpha_2 T_i + \alpha_3 D_i + \tau^{fe}(T_i \cdot D_i) + \theta' X_i + \varepsilon_{it}. Parameters ---------- y1 : ndarray A 1D array of outcomes from the post-treatment period. y0 : ndarray A 1D array of outcomes from the pre-treatment period. d : ndarray A 1D array of group indicators (1 if treated in post-treatment, 0 otherwise). covariates : ndarray, optional A 2D array of covariates to be used in the regression estimation. If None, the estimator uses ordid function. The design matrix will always include an intercept. i_weights : ndarray, optional A 1D array of weights. If None, then every observation has equal weight. Weights are normalized to have mean 1. boot : bool, default=False Whether to compute bootstrap standard errors. boot_type : {"weighted", "multiplier"}, default="weighted" Type of bootstrap to be performed (not relevant if boot = False). nboot : int, default=999 Number of bootstrap repetitions (not relevant if boot = False). influence_func : bool, default=False Whether to return the influence function. Returns ------- TWFEDIDPanelResult A NamedTuple containing the TWFE DiD point estimate, standard error, confidence interval, bootstrap draws, and influence function. Warnings -------- This estimator generally does not recover the ATT. We encourage users to adopt alternative specifications. See Also -------- reg_did_panel : Outcome regression DiD for panel data. drdid_imp_panel : Improved doubly robust DiD for panel data. ipw_did_panel : Inverse propensity weighted DiD for panel data. References ---------- .. [1] Sant'Anna, P. H. C. and Zhao, J. (2020), "Doubly Robust Difference-in-Differences Estimators." Journal of Econometrics, Vol. 219 (1), pp. 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003 Notes ----- The TWFE estimator is implemented by stacking the panel data and running a regression with treatment-period interaction. When no covariates are provided, this function calls the ``ordid`` function directly. """ d, covariates, i_weights, n_units = _validate_and_preprocess_inputs(d, covariates, i_weights) x, post, d_stacked, y_stacked, i_weights_stacked = _stack_data(y0, y1, d, covariates, i_weights, n_units) if x is not None: if np.all(d == 0): att = 0.0 se = 0.0 uci = 0.0 lci = 0.0 boots = None att_inf_func = np.zeros(len(y_stacked)) if influence_func else None elif np.all(d == 1): raise ValueError("All units are treated. Cannot identify ATT with TWFE.") else: att, att_inf_func = _fit_twfe_regression(y_stacked, d_stacked, post, x, i_weights_stacked) if not boot: se = np.std(att_inf_func, ddof=1) / np.sqrt(len(att_inf_func)) uci = att + 1.96 * se lci = att - 1.96 * se boots = None else: if nboot is None: nboot = 999 if boot_type == "multiplier": boots = mboot_twfep_did(att_inf_func, n_units, nboot) se = stats.iqr(boots) / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) critical_value = np.quantile(np.abs(boots / se), 0.95) uci = att + critical_value * se lci = att - critical_value * se else: # "weighted" design_matrix = np.column_stack( [ np.ones_like(y_stacked), d_stacked, post, d_stacked * post, x, ] ) boots = wboot_twfe_panel( y=y_stacked, d=d_stacked, post=post, x=design_matrix[:, 4:] if x is not None else np.ones((len(y_stacked), 1)), i_weights=i_weights_stacked, n_bootstrap=nboot, ) se = stats.iqr(boots - att) / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25)) critical_value = np.quantile(np.abs((boots - att) / se), 0.95) uci = att + critical_value * se lci = att - critical_value * se else: unit_ids = np.arange(1, n_units + 1) data_long = pl.DataFrame( { "y": y_stacked, "post": post.astype(int), "d": d_stacked.astype(int), "id": np.tile(unit_ids, 2), "w": i_weights_stacked, } ) reg = ordid( data=data_long, yname="y", tname="post", idname="id", treatname="d", weightsname="w", xformla=None, panel=True, boot=boot, boot_type=boot_type, n_boot=nboot, inf_func=influence_func, ) att = reg.att se = reg.se uci = reg.uci lci = reg.lci boots = reg.boots att_inf_func = reg.att_inf_func if not influence_func: att_inf_func = None boot_type_str = "multiplier" if boot_type == "multiplier" else "weighted" args = { "panel": True, "boot": boot, "boot_type": boot_type_str, "nboot": nboot, "type": "twfe", } return TWFEDIDPanelResult( att=att, se=se, uci=uci, lci=lci, boots=boots, att_inf_func=att_inf_func, args=args, )
def _validate_and_preprocess_inputs(d, covariates, i_weights): """Validate and preprocess input arrays.""" d = np.asarray(d).flatten() n_units = len(d) if i_weights is None: i_weights = np.ones(n_units) else: i_weights = np.asarray(i_weights).flatten() if np.any(i_weights < 0): raise ValueError("i_weights must be non-negative.") i_weights = i_weights / np.mean(i_weights) if covariates is not None: covariates = np.asarray(covariates) if covariates.ndim == 1: covariates = covariates.reshape(-1, 1) if np.all(covariates[:, 0] == 1): covariates = covariates[:, 1:] if covariates.shape[1] == 0: covariates = None return d, covariates, i_weights, n_units def _stack_data(y0, y1, d, covariates, i_weights, n_units): """Stack panel data for TWFE regression.""" x = None if covariates is not None: x = np.concatenate([covariates, covariates]) if covariates.ndim == 1 else np.vstack([covariates, covariates]) post = np.concatenate([np.zeros(n_units), np.ones(n_units)]) d_stacked = np.concatenate([d, d]) y_stacked = np.concatenate([np.asarray(y0).flatten(), np.asarray(y1).flatten()]) i_weights_stacked = np.concatenate([i_weights, i_weights]) return x, post, d_stacked, y_stacked, i_weights_stacked def _fit_twfe_regression(y_stacked, d_stacked, post, x, i_weights_stacked): """Fit TWFE regression and compute influence function.""" design_matrix = np.column_stack( [ np.ones_like(y_stacked), d_stacked, post, d_stacked * post, x, ] ) try: xp = get_backend() if xp is not np: beta, _ = cupy_wls(xp.asarray(y_stacked), xp.asarray(design_matrix), xp.asarray(i_weights_stacked)) params = to_numpy(beta) att = params[3] fitted = design_matrix @ params residuals = y_stacked - fitted else: wls_model = sm.WLS(y_stacked, design_matrix, weights=i_weights_stacked) wls_results = wls_model.fit() att = wls_results.params[3] residuals = wls_results.resid x_prime_x = design_matrix.T @ (i_weights_stacked[:, np.newaxis] * design_matrix) / len(y_stacked) if np.linalg.cond(x_prime_x) > 1e15: raise ValueError("The regression design matrix is singular. Consider removing some covariates.") x_prime_x_inv = np.linalg.inv(x_prime_x) influence_reg = (i_weights_stacked[:, np.newaxis] * design_matrix * residuals[:, np.newaxis]) @ x_prime_x_inv selection_theta = np.zeros(design_matrix.shape[1]) selection_theta[3] = 1 att_inf_func = influence_reg @ selection_theta except (np.linalg.LinAlgError, ValueError) as e: raise ValueError(f"Failed to fit TWFE regression model: {e}") from e return att, att_inf_func