"""Standardized inverse propensity weighted DiD estimator for panel data."""
import warnings
from typing import NamedTuple
import numpy as np
import statsmodels.api as sm
from scipy import stats
from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_logistic_irls
from ..bootstrap.boot_mult import mboot_did
from ..bootstrap.boot_panel import wboot_std_ipw_panel
class StdIPWDIDPanelResult(NamedTuple):
"""Result from the standardized IPW DiD Panel estimator."""
att: float
se: float
uci: float
lci: float
boots: np.ndarray | None
att_inf_func: np.ndarray | None
args: dict
[docs]
def std_ipw_did_panel(
y1,
y0,
d,
covariates,
i_weights=None,
boot=False,
boot_type="weighted",
nboot=999,
influence_func=False,
trim_level=0.995,
):
r"""Compute the standardized inverse propensity weighted DiD estimator for the ATT with panel data.
Implements the standardized inverse propensity weighted (IPW) estimator for the ATT with panel data,
as proposed by [1]_ and discussed in [2]_. This is the Hajek-type estimator, where weights are
normalized to sum to one. The estimator is given by equation (4.1) in [2]_ as
.. math::
\widehat{\tau}_{std}^{ipw,p} = \mathbb{E}_{n}\left[\left(\widehat{w}_{1}^{p}(D) -
\widehat{w}_{0}^{p}(D,X;\widehat{\gamma})\right)
\left(Y_{1}-Y_{0}\right)\right].
Parameters
----------
y1 : ndarray
A 1D array of outcomes from the post-treatment period.
y0 : ndarray
A 1D array of outcomes from the pre-treatment period.
d : ndarray
A 1D array of group indicators (1 if treated in post-treatment, 0 otherwise).
covariates : ndarray or None
A 2D array of covariates for propensity score estimation. An intercept must be
included if desired. If None, leads to an unconditional DiD estimator.
i_weights : ndarray, optional
A 1D array of observation weights. If None, weights are uniform.
Weights are normalized to have a mean of 1.
boot : bool, default=False
Whether to use bootstrap for inference.
boot_type : {"weighted", "multiplier"}, default="weighted"
Type of bootstrap to perform.
nboot : int, default=999
Number of bootstrap repetitions.
influence_func : bool, default=False
Whether to return the influence function.
trim_level : float, default=0.995
The trimming level for the propensity score.
Returns
-------
StdIPWDIDPanelResult
A NamedTuple containing the ATT estimate, standard error, confidence interval,
bootstrap draws, and influence function.
See Also
--------
ipw_did_panel : Non-standardized version of Abadie's IPW DiD estimator for panel data.
std_ipw_did_rc : Standardized IPW DiD estimator for repeated cross-section data.
References
----------
.. [1] Abadie, A. (2005). Semiparametric difference-in-differences estimators.
The Review of Economic Studies, 72(1), 1-19. https://doi.org/10.1111/0034-6527.00321
.. [2] Sant'Anna, P. H., & Zhao, J. (2020). Doubly robust difference-in-differences estimators.
Journal of Econometrics, 219(1), 101-122. https://doi.org/10.1016/j.jeconom.2020.06.003
Notes
-----
The standardized IPW estimator normalizes weights within each group, making it a Hajek-type estimator.
This can provide more stable estimates when there is substantial variation in weights across groups.
"""
xp = get_backend()
y1, y0, d, covariates, i_weights, n_units, delta_y = _validate_and_preprocess_inputs(
xp, y1, y0, d, covariates, i_weights
)
ps_fit, W, ps_results = _compute_propensity_score(xp, d, covariates, i_weights)
trim_ps = ps_fit < 1.01 # This effectively creates all True for treated units
trim_ps[d == 0] = ps_fit[d == 0] < trim_level
weights = _compute_weights(d, ps_fit, i_weights, trim_ps)
mean_w_treat = xp.mean(weights["w_treat"])
mean_w_cont = xp.mean(weights["w_cont"])
if mean_w_treat == 0:
warnings.warn("No effectively treated units after trimming.", UserWarning)
return StdIPWDIDPanelResult(
att=np.nan, se=np.nan, uci=np.nan, lci=np.nan, boots=None, att_inf_func=None, args={}
)
if mean_w_cont == 0:
warnings.warn("No effectively control units after trimming.", UserWarning)
return StdIPWDIDPanelResult(
att=np.nan, se=np.nan, uci=np.nan, lci=np.nan, boots=None, att_inf_func=None, args={}
)
eta_treat = weights["w_treat"] * delta_y / mean_w_treat
eta_cont = weights["w_cont"] * delta_y / mean_w_cont
att_treat = xp.mean(eta_treat)
att_cont = xp.mean(eta_cont)
ipw_att = att_treat - att_cont
influence_quantities = _get_influence_quantities(xp, d, covariates, ps_fit, i_weights, W, ps_results, n_units)
att_inf_func = _compute_influence_function(
xp,
eta_treat,
eta_cont,
att_treat,
att_cont,
delta_y,
covariates,
weights,
mean_w_treat,
mean_w_cont,
influence_quantities,
)
att_inf_func = to_numpy(att_inf_func)
ipw_att = float(ipw_att)
# Inference
if not boot:
se_att = np.std(att_inf_func, ddof=1) * np.sqrt(n_units - 1) / n_units
uci = ipw_att + 1.96 * se_att
lci = ipw_att - 1.96 * se_att
ipw_boot = None
else:
if nboot is None:
nboot = 999
if boot_type == "multiplier":
ipw_boot = mboot_did(att_inf_func, nboot)
se_att = stats.iqr(ipw_boot, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25))
cv = np.nanquantile(np.abs(ipw_boot / se_att), 0.95)
uci = ipw_att + cv * se_att
lci = ipw_att - cv * se_att
else: # "weighted"
ipw_boot = wboot_std_ipw_panel(
delta_y=delta_y, d=d, x=covariates, i_weights=i_weights, n_bootstrap=nboot, trim_level=trim_level
)
se_att = stats.iqr(ipw_boot - ipw_att, nan_policy="omit") / (stats.norm.ppf(0.75) - stats.norm.ppf(0.25))
cv = np.nanquantile(np.abs((ipw_boot - ipw_att) / se_att), 0.95)
uci = ipw_att + cv * se_att
lci = ipw_att - cv * se_att
if not influence_func:
att_inf_func = None
boot_type_str = "multiplier" if boot_type == "multiplier" else "weighted"
args = {
"panel": True,
"normalized": True,
"boot": boot,
"boot_type": boot_type_str,
"nboot": nboot,
"type": "ipw",
"trim_level": trim_level,
}
return StdIPWDIDPanelResult(
att=ipw_att,
se=se_att,
uci=uci,
lci=lci,
boots=ipw_boot,
att_inf_func=att_inf_func,
args=args,
)
def _validate_and_preprocess_inputs(xp, y1, y0, d, covariates, i_weights):
"""Validate and preprocess input arrays."""
d = xp.asarray(d).flatten()
n_units = len(d)
delta_y = xp.asarray(y1).flatten() - xp.asarray(y0).flatten()
covariates = xp.ones((n_units, 1)) if covariates is None else xp.asarray(covariates)
# Weights
if i_weights is None:
i_weights = xp.ones(n_units)
else:
i_weights = xp.asarray(i_weights).flatten()
if xp.any(i_weights < 0):
raise ValueError("i_weights must be non-negative.")
i_weights = i_weights / xp.mean(i_weights)
# Check if we have variation in treatment
unique_d = xp.unique(d)
if len(unique_d) < 2:
if unique_d[0] == 0:
raise ValueError("No treated units found. Cannot estimate treatment effect.")
raise ValueError("No control units found. Cannot estimate treatment effect.")
return y1, y0, d, covariates, i_weights, n_units, delta_y
def _compute_propensity_score(xp, d, covariates, i_weights):
"""Compute propensity score using logistic regression."""
ps_results = None
if xp is not np:
try:
beta, ps_fit = cupy_logistic_irls(
xp.asarray(d, dtype=xp.float64),
xp.asarray(covariates, dtype=xp.float64),
xp.asarray(i_weights, dtype=xp.float64),
)
if xp.any(xp.isnan(beta)):
raise ValueError(
"Propensity score model coefficients have NA components. \n"
"Multicollinearity (or lack of variation) of covariates is a likely reason."
)
except (np.linalg.LinAlgError, RuntimeError) as e:
raise ValueError("Failed to estimate propensity scores due to singular matrix.") from e
else:
try:
ps_model = sm.GLM(d, covariates, family=sm.families.Binomial(), freq_weights=i_weights)
ps_results = ps_model.fit()
if not ps_results.converged:
warnings.warn("Propensity score estimation did not converge.", UserWarning)
if np.any(np.isnan(ps_results.params)):
raise ValueError(
"Propensity score model coefficients have NA components. \n"
"Multicollinearity (or lack of variation) of covariates is a likely reason."
)
ps_fit = ps_results.predict(covariates)
except np.linalg.LinAlgError as e:
raise ValueError("Failed to estimate propensity scores due to singular matrix.") from e
ps_fit = xp.clip(xp.asarray(ps_fit), 1e-6, 1 - 1e-6)
W = ps_fit * (1 - ps_fit) * i_weights
return ps_fit, W, ps_results
def _compute_weights(d, ps_fit, i_weights, trim_ps):
"""Compute standardized IPW weights."""
w_treat = trim_ps * i_weights * d
w_cont = trim_ps * i_weights * ps_fit * (1 - d) / (1 - ps_fit)
return {
"w_treat": w_treat,
"w_cont": w_cont,
}
def _get_influence_quantities(xp, d, covariates, ps_fit, i_weights, W, ps_results, n_units):
"""Compute quantities needed for influence function."""
# Asymptotic linear representation of logit's beta's
score_ps = (i_weights * (d - ps_fit))[:, xp.newaxis] * covariates
try:
weighted_cov_matrix = covariates.T @ (W[:, xp.newaxis] * covariates)
hessian_ps = xp.linalg.inv(weighted_cov_matrix) * n_units
except np.linalg.LinAlgError:
hessian_ps = ps_results.cov_params() * n_units
asy_lin_rep_ps = score_ps @ hessian_ps
return {
"asy_lin_rep_ps": asy_lin_rep_ps,
}
def _compute_influence_function(
xp,
eta_treat,
eta_cont,
att_treat,
att_cont,
delta_y,
covariates,
weights,
mean_w_treat,
mean_w_cont,
influence_quantities,
):
"""Compute the influence function for standardized IPW estimator."""
w_treat = weights["w_treat"]
w_cont = weights["w_cont"]
asy_lin_rep_ps = influence_quantities["asy_lin_rep_ps"]
# Influence function of treated component
# Leading term of the influence function: no estimation effect
inf_treat = eta_treat - w_treat * att_treat / mean_w_treat
# Influence function of control component
# Leading term of the influence function: no estimation effect
inf_cont = eta_cont - w_cont * att_cont / mean_w_cont
# Derivative matrix (k x 1 vector)
mom_logit = xp.mean((w_cont * (delta_y - att_cont))[:, xp.newaxis] * covariates, axis=0) / mean_w_cont
# Now the influence function related to estimation effect of pscores
inf_cont_ps = asy_lin_rep_ps @ mom_logit
# Get the influence function of the standardized IPW estimator (put all pieces together)
att_inf_func = inf_treat - (inf_cont + inf_cont_ps)
return att_inf_func