"""Improved and locally efficient doubly robust DiD estimators for repeated cross-section data."""
import warnings
from typing import NamedTuple
import numpy as np
from scipy import stats
from moderndid.cupy.backend import get_backend, to_numpy
from ..bootstrap.boot_mult import mboot_did
from ..bootstrap.boot_rc_ipt import wboot_drdid_ipt_rc2
from ..propensity.pscore_ipt import calculate_pscore_ipt
from .wols import wols_rc
class DRDIDLocalRCResult(NamedTuple):
"""Result from the drdid Local RC estimator."""
att: float
se: float
uci: float
lci: float
boots: np.ndarray | None
att_inf_func: np.ndarray | None
args: dict
[docs]
def drdid_imp_local_rc(
y,
post,
d,
covariates,
i_weights=None,
boot=False,
boot_type="weighted",
nboot=999,
influence_func=False,
trim_level=0.995,
):
r"""Compute the improved and locally efficient DR-DiD estimator for the ATT with repeated cross-section data.
Implements the locally efficient and improved doubly robust DiD estimator for the ATT
with repeated cross-sectional data. The estimator is implemented as in equation (3.10) of [2]_ as
.. math::
\widehat{\tau}_{2,imp}^{dr,rc} = \widehat{\tau}_{1,imp}^{dr,rc} +
\mathbb{E}_{n}\left[\left(\frac{D}{\mathbb{E}_{n}[D]} - \widehat{w}_{1,1}^{rc}(D,T)\right)
(\mu_{1,1}^{rc}(X; \widehat{\beta}_{1,1}^{ols,rc}) - \mu_{0,1}^{rc}(X; \widehat{\beta}_{0,1}^{wls,rc}))\right]
\\
- \mathbb{E}_{n}\left[\left(\frac{D}{\mathbb{E}_{n}[D]} - \widehat{w}_{1,0}^{rc}(D,T)\right)
(\mu_{1,0}^{rc}(X; \widehat{\beta}_{1,0}^{ols,rc}) - \mu_{0,0}^{rc}(X; \widehat{\beta}_{0,0}^{wls,rc}))\right],
where :math:`\widehat{\tau}_{1, i m p}^{d r, r c}` is the improved DR-DiD estimator that is not locally
efficient and
.. math::
\widehat{\tau}_{1,imp}^{dr,rc} = \mathbb{E}_{n}\left[\left(\widehat{w}_{1}^{rc}(D,T)
- \widehat{w}_{0}^{rc}(D,T,X;\widehat{\gamma}^{ipt})\right)
(Y - \mu_{0,Y}^{lin,rc}(T,X;\widehat{\beta}_{0,1}^{wls,rc}, \widehat{\beta}_{0,0}^{wls,rc}))\right].
This estimator uses a logistic propensity score model and separate linear regression models for the
control and treated groups' outcomes in both pre and post-treatment periods. The resulting estimator
is doubly robust and locally efficient.
Parameters
----------
y : ndarray
A 1D array of outcomes from both pre- and post-treatment periods.
post : ndarray
A 1D array of post-treatment dummies (1 if post-treatment, 0 if pre-treatment).
d : ndarray
A 1D array of group indicators (1 if treated in post-treatment, 0 otherwise).
covariates : ndarray
A 2D array of covariates for propensity score and outcome regression.
An intercept must be included if desired.
i_weights : ndarray, optional
A 1D array of observation weights. If None, weights are uniform.
Weights are normalized to have a mean of 1.
boot : bool, default=False
Whether to use bootstrap for inference.
boot_type : {"weighted", "multiplier"}, default="weighted"
Type of bootstrap to perform.
nboot : int, default=999
Number of bootstrap repetitions.
influence_func : bool, default=False
Whether to return the influence function.
trim_level : float, default=0.995
The trimming level for the propensity score.
Returns
-------
DRDIDLocalRCResult
A NamedTuple containing the ATT estimate, standard error, confidence interval,
bootstrap draws, and influence function.
Notes
-----
The nuisance parameters are estimated as described in Section 3.2 of [2]_.
The propensity score is estimated using the inverse probability tilting estimator from [1]_,
.. math::
\widehat{\gamma}^{ipt} = \arg\max_{\gamma \in \Gamma} \mathbb{E}_{n}
\left[D X^{\prime} \gamma - (1-D) \exp(X^{\prime} \gamma)\right]
The outcome regression coefficients for the control group are estimated using weighted least squares,
.. math::
\widehat{\beta}_{0,t}^{wls,rc} = \arg\min_{b \in \Theta} \mathbb{E}_{n}
\left[\left.\frac{\Lambda(X^{\prime}\hat{\gamma}^{ipt})}{1-\Lambda(X^{\prime}\hat{\gamma}^{ipt})}
(Y-X^{\prime}b)^{2} \right\rvert\, D=0, T=t\right]
and for the treated group using ordinary least squares.
.. math::
\widehat{\beta}_{1,t}^{ols,rc} = \arg\min_{b \in \Theta} \mathbb{E}_{n}
\left[\left(Y-X^{\prime}b\right)^{2} \mid D=1, T=t\right]
The resulting estimator is doubly robust and locally efficient.
See Also
--------
drdid_imp_rc : Improved, but not locally efficient, DR-DiD estimator for repeated cross-section data.
drdid_rc : Locally efficient DR-DiD estimator for repeated cross-section data.
drdid_trad_rc : Traditional (not locally efficient or improved) doubly robust DiD estimator.
References
----------
.. [1] Graham, B. S., Pinto, C. C., & Egel, D. (2012). *Inverse probability tilting for moment
condition models with missing data.* The Review of Economic Studies, 79(3), 1053-1079.
https://doi.org/10.1093/restud/rdr047
.. [2] Sant'Anna, P. H., & Zhao, J. (2020). *Doubly robust difference-in-differences estimators.*
Journal of Econometrics, 219(1), 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003
arXiv preprint: https://arxiv.org/abs/1812.01723
"""
xp = get_backend()
y, post, d, covariates, i_weights, n_units = _validate_and_preprocess_inputs(xp, y, post, d, covariates, i_weights)
ps_fit = calculate_pscore_ipt(D=d, X=covariates, iw=i_weights)
ps_fit = xp.clip(xp.asarray(ps_fit), 1e-6, 1 - 1e-6)
trim_ps = xp.ones(n_units, dtype=bool)
trim_ps[d == 0] = ps_fit[d == 0] < trim_level
out_y_cont_pre_res = wols_rc(y, post, d, covariates, ps_fit, i_weights, pre=True, treat=False)
out_y_cont_post_res = wols_rc(y, post, d, covariates, ps_fit, i_weights, pre=False, treat=False)
out_y_cont_pre = out_y_cont_pre_res.out_reg
out_y_cont_post = out_y_cont_post_res.out_reg
out_y_cont = post * out_y_cont_post + (1 - post) * out_y_cont_pre
out_y_treat_pre_res = wols_rc(y, post, d, covariates, ps_fit, i_weights, pre=True, treat=True)
out_y_treat_pre = out_y_treat_pre_res.out_reg
out_y_treat_post_res = wols_rc(y, post, d, covariates, ps_fit, i_weights, pre=False, treat=True)
out_y_treat_post = out_y_treat_post_res.out_reg
weights = _compute_weights(xp, d, post, ps_fit, i_weights, trim_ps)
influence_components = _get_influence_quantities(
xp, y, out_y_cont, out_y_treat_pre, out_y_treat_post, out_y_cont_pre, out_y_cont_post, weights
)
dr_att, att_inf_func = _compute_influence_function(xp, influence_components, weights)
att_inf_func = to_numpy(att_inf_func)
dr_att = float(dr_att)
# Inference
dr_boot = None
if not boot:
se_dr_att = np.std(att_inf_func, ddof=1) * np.sqrt(n_units - 1) / n_units
uci = dr_att + 1.96 * se_dr_att
lci = dr_att - 1.96 * se_dr_att
else:
if nboot is None:
nboot = 999
if boot_type == "multiplier":
dr_boot = mboot_did(att_inf_func, nboot)
se_dr_att = stats.iqr(dr_boot, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25))
if se_dr_att > 0:
cv = np.nanquantile(np.abs(dr_boot / se_dr_att), 0.95)
uci = dr_att + cv * se_dr_att
lci = dr_att - cv * se_dr_att
else:
uci = lci = dr_att
warnings.warn("Bootstrap standard error is zero.", UserWarning)
else: # "weighted"
dr_boot = wboot_drdid_ipt_rc2(
y=y,
post=post,
d=d,
x=covariates,
i_weights=i_weights,
n_bootstrap=nboot,
trim_level=trim_level,
)
se_dr_att = stats.iqr(dr_boot - dr_att, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25))
if se_dr_att > 0:
cv = np.nanquantile(np.abs((dr_boot - dr_att) / se_dr_att), 0.95)
uci = dr_att + cv * se_dr_att
lci = dr_att - cv * se_dr_att
else:
uci = lci = dr_att
warnings.warn("Bootstrap standard error is zero.", UserWarning)
if not influence_func:
att_inf_func = None
args = {
"panel": False,
"estMethod": "imp2",
"boot": boot,
"boot.type": boot_type,
"nboot": nboot,
"type": "dr",
"trim.level": trim_level,
}
return DRDIDLocalRCResult(
att=dr_att,
se=se_dr_att,
uci=uci,
lci=lci,
boots=dr_boot,
att_inf_func=att_inf_func,
args=args,
)
def _validate_and_preprocess_inputs(
xp,
y,
post,
d,
covariates,
i_weights,
):
"""Validate and preprocess input arrays."""
d = xp.asarray(d).flatten()
n_units = len(d)
y = xp.asarray(y).flatten()
post = xp.asarray(post).flatten()
covariates = xp.ones((n_units, 1)) if covariates is None else xp.asarray(covariates)
if i_weights is None:
i_weights = xp.ones(n_units)
else:
i_weights = xp.asarray(i_weights).flatten()
if xp.any(i_weights < 0):
raise ValueError("i_weights must be non-negative.")
i_weights = i_weights / xp.mean(i_weights)
return y, post, d, covariates, i_weights, n_units
def _compute_weights(
xp,
d,
post,
ps_fit,
i_weights,
trim_ps,
):
"""Compute weights for locally efficient and improved DR-DiD estimator."""
w_treat_pre = i_weights * d * (1 - post)
w_treat_post = i_weights * d * post
w_cont_pre = trim_ps * i_weights * ps_fit * (1 - d) * (1 - post) / (1 - ps_fit)
w_cont_post = trim_ps * i_weights * ps_fit * (1 - d) * post / (1 - ps_fit)
w_cont_pre = xp.nan_to_num(w_cont_pre, nan=0.0, posinf=0.0, neginf=0.0)
w_cont_post = xp.nan_to_num(w_cont_post, nan=0.0, posinf=0.0, neginf=0.0)
# Additional weights for locally efficient estimator
w_d = i_weights * d
w_dt1 = i_weights * d * post
w_dt0 = i_weights * d * (1 - post)
return {
"w_treat_pre": w_treat_pre,
"w_treat_post": w_treat_post,
"w_cont_pre": w_cont_pre,
"w_cont_post": w_cont_post,
"w_d": w_d,
"w_dt1": w_dt1,
"w_dt0": w_dt0,
}
def _get_influence_quantities(
xp,
y,
out_y_cont,
out_y_treat_pre,
out_y_treat_post,
out_y_cont_pre,
out_y_cont_post,
weights,
):
"""Compute influence function."""
w_treat_pre = weights["w_treat_pre"]
w_treat_post = weights["w_treat_post"]
w_cont_pre = weights["w_cont_pre"]
w_cont_post = weights["w_cont_post"]
w_d = weights["w_d"]
w_dt1 = weights["w_dt1"]
w_dt0 = weights["w_dt0"]
# eta components (influence function summands)
eta_treat_pre = w_treat_pre * (y - out_y_cont) / xp.mean(w_treat_pre)
eta_treat_post = w_treat_post * (y - out_y_cont) / xp.mean(w_treat_post)
eta_cont_pre = w_cont_pre * (y - out_y_cont) / xp.mean(w_cont_pre)
eta_cont_post = w_cont_post * (y - out_y_cont) / xp.mean(w_cont_post)
# Extra components for locally efficient estimator (see Sant'Anna and Zhao (2020))
eta_d_post = w_d * (out_y_treat_post - out_y_cont_post) / xp.mean(w_d)
eta_dt1_post = w_dt1 * (out_y_treat_post - out_y_cont_post) / xp.mean(w_dt1)
eta_d_pre = w_d * (out_y_treat_pre - out_y_cont_pre) / xp.mean(w_d)
eta_dt0_pre = w_dt0 * (out_y_treat_pre - out_y_cont_pre) / xp.mean(w_dt0)
return {
"eta_treat_pre": eta_treat_pre,
"eta_treat_post": eta_treat_post,
"eta_cont_pre": eta_cont_pre,
"eta_cont_post": eta_cont_post,
"eta_d_post": eta_d_post,
"eta_dt1_post": eta_dt1_post,
"eta_d_pre": eta_d_pre,
"eta_dt0_pre": eta_dt0_pre,
}
def _compute_influence_function(
xp,
components,
weights,
):
"""Assemble the locally efficient influence function."""
eta_treat_pre = components["eta_treat_pre"]
eta_treat_post = components["eta_treat_post"]
eta_cont_pre = components["eta_cont_pre"]
eta_cont_post = components["eta_cont_post"]
eta_d_post = components["eta_d_post"]
eta_dt1_post = components["eta_dt1_post"]
eta_d_pre = components["eta_d_pre"]
eta_dt0_pre = components["eta_dt0_pre"]
# Get weights
w_treat_pre = weights["w_treat_pre"]
w_treat_post = weights["w_treat_post"]
w_cont_pre = weights["w_cont_pre"]
w_cont_post = weights["w_cont_post"]
w_d = weights["w_d"]
w_dt1 = weights["w_dt1"]
w_dt0 = weights["w_dt0"]
# Compute ATT
att_treat_pre = xp.mean(eta_treat_pre)
att_treat_post = xp.mean(eta_treat_post)
att_cont_pre = xp.mean(eta_cont_pre)
att_cont_post = xp.mean(eta_cont_post)
att_d_post = xp.mean(eta_d_post)
att_dt1_post = xp.mean(eta_dt1_post)
att_d_pre = xp.mean(eta_d_pre)
att_dt0_pre = xp.mean(eta_dt0_pre)
dr_att = (
(att_treat_post - att_treat_pre)
- (att_cont_post - att_cont_pre)
+ (att_d_post - att_dt1_post)
- (att_d_pre - att_dt0_pre)
)
# Influence functions for treatment component
inf_treat_pre = eta_treat_pre - w_treat_pre * att_treat_pre / xp.mean(w_treat_pre)
inf_treat_post = eta_treat_post - w_treat_post * att_treat_post / xp.mean(w_treat_post)
inf_treat = inf_treat_post - inf_treat_pre
# Influence functions for control component
inf_cont_pre = eta_cont_pre - w_cont_pre * att_cont_pre / xp.mean(w_cont_pre)
inf_cont_post = eta_cont_post - w_cont_post * att_cont_post / xp.mean(w_cont_post)
inf_cont = inf_cont_post - inf_cont_pre
# Base DR influence function
dr_att_inf_func1 = inf_treat - inf_cont
# Adjustment terms for local efficiency
inf_eff1 = eta_d_post - w_d * att_d_post / xp.mean(w_d)
inf_eff2 = eta_dt1_post - w_dt1 * att_dt1_post / xp.mean(w_dt1)
inf_eff3 = eta_d_pre - w_d * att_d_pre / xp.mean(w_d)
inf_eff4 = eta_dt0_pre - w_dt0 * att_dt0_pre / xp.mean(w_dt0)
inf_eff = (inf_eff1 - inf_eff2) - (inf_eff3 - inf_eff4)
# Final locally efficient influence function
att_inf_func = dr_att_inf_func1 + inf_eff
return dr_att, att_inf_func