"""Outcome regression DiD estimator for repeated cross-sections data."""
from typing import NamedTuple
import numpy as np
import statsmodels.api as sm
from scipy import stats
from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_wls
from ..bootstrap.boot_mult import mboot_did
from ..bootstrap.boot_reg_rc import wboot_reg_rc
class RegDIDRCResult(NamedTuple):
"""Result from the regression DiD RC estimator."""
att: float
se: float
uci: float
lci: float
boots: np.ndarray | None
att_inf_func: np.ndarray | None
args: dict
[docs]
def reg_did_rc(
y,
post,
d,
covariates=None,
i_weights=None,
boot=False,
boot_type="weighted",
nboot=999,
influence_func=False,
):
r"""Compute the outcome regression DiD estimator for the ATT with repeated cross-section data.
Implements outcome regression difference-in-differences (DiD) estimator for the ATT when stationary
repeated cross-sectional data are available. The estimator is a sample analogue of equation (2.2)
in [2]_. The estimator is given by
.. math::
\widehat{\tau}^{reg} = \bar{Y}_{1,1} - \left[\bar{Y}_{1,0} + n_{treat}^{-1}
\sum_{i|D_i=1} (\widehat{\mu}_{0,1}(X_i) - \widehat{\mu}_{0,0}(X_i))\right].
The estimator follows the same spirit of the nonparametric estimators proposed by [1]_, though here the
outcome regression models are assumed to be linear in covariates (parametric). The nuisance parameters (outcome
regression coefficients) are estimated via ordinary least squares.
Parameters
----------
y : ndarray
A 1D array of outcomes from both pre- and post-treatment periods.
post : ndarray
A 1D array of post-treatment dummies (1 if post-treatment, 0 if pre-treatment).
d : ndarray
A 1D array of group indicators (1 if treated in post-treatment, 0 otherwise).
covariates : ndarray, optional
A 2D array of covariates to be used in the regression estimation. If None,
this leads to an unconditional DiD estimator.
i_weights : ndarray, optional
A 1D array of weights. If None, then every observation has equal weight.
Weights are normalized to have mean 1.
boot : bool, optional
Whether to use bootstrap for inference. Default is False.
boot_type : str, optional
Type of bootstrap ("weighted" or "multiplier"). Default is "weighted".
nboot : int, optional
Number of bootstrap repetitions. Default is 999.
influence_func : bool, optional
Whether to return the influence function. Default is False.
Returns
-------
RegDIDRCResult
A named tuple containing:
- att : float
The outcome regression DiD point estimate.
- se : float
The outcome regression DiD standard error.
- uci : float
Upper bound of a 95% confidence interval.
- lci : float
Lower bound of a 95% confidence interval.
- boots : ndarray or None
Bootstrap draws of the ATT if boot=True.
- att_inf_func : ndarray or None
Influence function values if influence_func=True.
- args : dict
Arguments used in the estimation.
See Also
--------
ipw_did_rc : Inverse propensity weighted DiD for repeated cross-sections.
drdid_rc : Doubly robust DiD for repeated cross-sections.
References
----------
.. [1] Heckman, J., Ichimura, H., and Todd, P. (1997), "Matching as an Econometric Evaluation
Estimator: Evidence from Evaluating a Job Training Programme", Review of Economic Studies,
vol. 64(4), p. 605–654. https://doi.org/10.2307/2971733
.. [2] Sant'Anna, P. H. C. and Zhao, J. (2020), "Doubly Robust Difference-in-Differences Estimators."
Journal of Econometrics, Vol. 219 (1), pp. 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003
"""
y, post, d, int_cov, i_weights, n_units = _validate_and_preprocess_inputs(y, post, d, covariates, i_weights)
out_y_pre, out_y_post = _fit_outcome_regressions(y, post, d, int_cov, i_weights)
weights = _compute_weights(d, post, i_weights)
reg_att_treat_pre = weights["w_treat_pre"] * y
reg_att_treat_post = weights["w_treat_post"] * y
reg_att_cont = weights["w_cont"] * (out_y_post - out_y_pre)
eta_treat_pre = np.mean(reg_att_treat_pre) / np.mean(weights["w_treat_pre"])
eta_treat_post = np.mean(reg_att_treat_post) / np.mean(weights["w_treat_post"])
eta_cont = np.mean(reg_att_cont) / np.mean(weights["w_cont"])
reg_att = (eta_treat_post - eta_treat_pre) - eta_cont
influence_quantities = _get_influence_quantities(y, post, d, int_cov, out_y_pre, out_y_post, i_weights, n_units)
reg_att_inf_func = _compute_influence_function(
reg_att_treat_pre,
reg_att_treat_post,
reg_att_cont,
eta_treat_pre,
eta_treat_post,
eta_cont,
weights,
int_cov,
influence_quantities,
)
# Inference
if not boot:
se_reg_att = np.std(reg_att_inf_func, ddof=1) * np.sqrt(n_units - 1) / n_units
uci = reg_att + 1.96 * se_reg_att
lci = reg_att - 1.96 * se_reg_att
reg_boot = None
else:
if boot_type == "multiplier":
reg_boot = mboot_did(reg_att_inf_func, nboot)
se_reg_att = stats.iqr(reg_boot) / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25))
cv = np.quantile(np.abs(reg_boot / se_reg_att), 0.95)
uci = reg_att + cv * se_reg_att
lci = reg_att - cv * se_reg_att
else: # "weighted"
reg_boot = wboot_reg_rc(y, post, d, int_cov, i_weights, n_bootstrap=nboot)
se_reg_att = stats.iqr(reg_boot - reg_att) / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25))
cv = np.quantile(np.abs((reg_boot - reg_att) / se_reg_att), 0.95)
uci = reg_att + cv * se_reg_att
lci = reg_att - cv * se_reg_att
if not influence_func:
reg_att_inf_func = None
args = {
"panel": False,
"boot": boot,
"boot_type": boot_type,
"nboot": nboot,
"type": "or",
}
return RegDIDRCResult(
att=reg_att,
se=se_reg_att,
uci=uci,
lci=lci,
boots=reg_boot,
att_inf_func=reg_att_inf_func,
args=args,
)
def _validate_and_preprocess_inputs(y, post, d, covariates, i_weights):
"""Validate and preprocess input arrays."""
d = np.asarray(d).flatten()
post = np.asarray(post).flatten()
y = np.asarray(y).flatten()
n_units = len(d)
if covariates is None:
int_cov = np.ones((n_units, 1))
else:
int_cov = np.asarray(covariates)
if int_cov.ndim == 1:
int_cov = int_cov.reshape(-1, 1)
if i_weights is None:
i_weights = np.ones(n_units)
else:
i_weights = np.asarray(i_weights).flatten()
if np.any(i_weights < 0):
raise ValueError("i_weights must be non-negative.")
i_weights = i_weights / np.mean(i_weights)
return y, post, d, int_cov, i_weights, n_units
def _fit_outcome_regressions(y, post, d, int_cov, i_weights):
"""Fit outcome regression models for control units in pre and post periods."""
xp = get_backend()
# Pre-treatment period
pre_filter = (d == 0) & (post == 0)
n_control_pre = np.sum(pre_filter)
if n_control_pre == 0:
raise ValueError("No control units in pre-treatment period.")
if n_control_pre < int_cov.shape[1]:
raise ValueError("Insufficient control units in pre-treatment period for regression.")
if xp is not np:
try:
beta_pre, _ = cupy_wls(
xp.asarray(y[pre_filter]),
xp.asarray(int_cov[pre_filter]),
xp.asarray(i_weights[pre_filter]),
)
reg_coeff_pre = to_numpy(beta_pre)
except (np.linalg.LinAlgError, RuntimeError) as e:
raise ValueError(f"Failed to fit pre-period regression: {e}") from e
else:
try:
glm_pre = sm.GLM(
y[pre_filter],
int_cov[pre_filter],
family=sm.families.Gaussian(link=sm.families.links.Identity()),
var_weights=i_weights[pre_filter],
)
res_pre = glm_pre.fit()
reg_coeff_pre = res_pre.params
except (np.linalg.LinAlgError, ValueError) as e:
raise ValueError(f"Failed to fit pre-period regression: {e}") from e
if np.any(np.isnan(reg_coeff_pre)):
raise ValueError(
"Outcome regression model coefficients have NA components. "
"Multicollinearity of covariates is probably the reason for it."
)
out_y_pre = int_cov @ reg_coeff_pre
# Post-treatment period
post_filter = (d == 0) & (post == 1)
n_control_post = np.sum(post_filter)
if n_control_post == 0:
raise ValueError("No control units in post-treatment period.")
if n_control_post < int_cov.shape[1]:
raise ValueError("Insufficient control units in post-treatment period for regression.")
if xp is not np:
try:
beta_post, _ = cupy_wls(
xp.asarray(y[post_filter]),
xp.asarray(int_cov[post_filter]),
xp.asarray(i_weights[post_filter]),
)
reg_coeff_post = to_numpy(beta_post)
except (np.linalg.LinAlgError, RuntimeError) as e:
raise ValueError(f"Failed to fit post-period regression: {e}") from e
else:
try:
glm_post = sm.GLM(
y[post_filter],
int_cov[post_filter],
family=sm.families.Gaussian(link=sm.families.links.Identity()),
var_weights=i_weights[post_filter],
)
res_post = glm_post.fit()
reg_coeff_post = res_post.params
except (np.linalg.LinAlgError, ValueError) as e:
raise ValueError(f"Failed to fit post-period regression: {e}") from e
if np.any(np.isnan(reg_coeff_post)):
raise ValueError(
"Outcome regression model coefficients have NA components. "
"Multicollinearity (or lack of variation) of covariates is probably the reason for it."
)
out_y_post = int_cov @ reg_coeff_post
return out_y_pre, out_y_post
def _compute_weights(d, post, i_weights):
"""Compute weights for outcome regression DiD estimator."""
w_treat_pre = i_weights * d * (1 - post)
w_treat_post = i_weights * d * post
w_cont = i_weights * d
return {
"w_treat_pre": w_treat_pre,
"w_treat_post": w_treat_post,
"w_cont": w_cont,
}
def _get_influence_quantities(y, post, d, int_cov, out_y_pre, out_y_post, i_weights, n_units):
"""Compute quantities needed for influence function."""
# Asymptotic linear representation of OLS parameters in pre-period
weights_ols_pre = i_weights * (1 - d) * (1 - post)
weighted_x_pre = weights_ols_pre[:, np.newaxis] * int_cov
weighted_resid_x_pre = weights_ols_pre[:, np.newaxis] * (y - out_y_pre)[:, np.newaxis] * int_cov
gram_pre = weighted_x_pre.T @ int_cov / n_units
if np.linalg.cond(gram_pre) > 1e15:
raise ValueError(
"The regression design matrix for pre-treatment is singular. Consider removing some covariates."
)
gram_inv_pre = np.linalg.inv(gram_pre)
asy_lin_rep_ols_pre = weighted_resid_x_pre @ gram_inv_pre
# Asymptotic linear representation of OLS parameters in post-period
weights_ols_post = i_weights * (1 - d) * post
weighted_x_post = weights_ols_post[:, np.newaxis] * int_cov
weighted_resid_x_post = weights_ols_post[:, np.newaxis] * (y - out_y_post)[:, np.newaxis] * int_cov
gram_post = weighted_x_post.T @ int_cov / n_units
if np.linalg.cond(gram_post) > 1e15:
raise ValueError(
"The regression design matrix for post-treatment is singular. Consider removing some covariates."
)
gram_inv_post = np.linalg.inv(gram_post)
asy_lin_rep_ols_post = weighted_resid_x_post @ gram_inv_post
return {
"asy_lin_rep_ols_pre": asy_lin_rep_ols_pre,
"asy_lin_rep_ols_post": asy_lin_rep_ols_post,
}
def _compute_influence_function(
reg_att_treat_pre,
reg_att_treat_post,
reg_att_cont,
eta_treat_pre,
eta_treat_post,
eta_cont,
weights,
int_cov,
influence_quantities,
):
"""Compute the influence function for outcome regression estimator."""
w_treat_pre = weights["w_treat_pre"]
w_treat_post = weights["w_treat_post"]
w_cont = weights["w_cont"]
asy_lin_rep_ols_pre = influence_quantities["asy_lin_rep_ols_pre"]
asy_lin_rep_ols_post = influence_quantities["asy_lin_rep_ols_post"]
# Influence function of the "treat" component
# Leading term of the influence function
inf_treat_pre = (reg_att_treat_pre - w_treat_pre * eta_treat_pre) / np.mean(w_treat_pre)
inf_treat_post = (reg_att_treat_post - w_treat_post * eta_treat_post) / np.mean(w_treat_post)
inf_treat = inf_treat_post - inf_treat_pre
# Influence function of control component
# Leading term of the influence function: no estimation effect
inf_cont_1 = reg_att_cont - w_cont * eta_cont
# Estimation effect from beta hat (OLS using only controls)
# Derivative matrix (k x 1 vector)
control_ols_derivative = np.mean(w_cont[:, np.newaxis] * int_cov, axis=0)
# Now get the influence function related to the estimation effect related to beta's in post-treatment
inf_cont_2_post = asy_lin_rep_ols_post @ control_ols_derivative
# Now get the influence function related to the estimation effect related to beta's in pre-treatment
inf_cont_2_pre = asy_lin_rep_ols_pre @ control_ols_derivative
# Influence function for the control component
inf_control = (inf_cont_1 + inf_cont_2_post - inf_cont_2_pre) / np.mean(w_cont)
# Get the influence function of the OR estimator (put all pieces together)
reg_att_inf_func = inf_treat - inf_control
return reg_att_inf_func