Source code for moderndid.drdid.bootstrap.boot_panel

"""Bootstrap inference for panel data DiD estimators."""

import warnings

import numpy as np
import statsmodels.api as sm

from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_logistic_irls, cupy_wls

from ..estimators.wols import wols_panel
from ..propensity.aipw_estimators import aipw_did_panel
from ..propensity.pscore_ipt import calculate_pscore_ipt
from ..utils import _validate_inputs


[docs] def wboot_drdid_imp_panel(delta_y, d, x, i_weights, n_bootstrap=1000, trim_level=0.995, random_state=None): r"""Compute improved bootstrap estimates for doubly-robust DiD with panel data. Implements the improved bootstrap inference for the doubly-robust difference-in-differences estimator with panel data, using the inverse probability tilting method for propensity score estimation. Parameters ---------- delta_y : ndarray A 1D array representing the difference in outcomes between the post-treatment and pre-treatment periods (Y_post - Y_pre) for each unit. d : ndarray A 1D array representing the treatment indicator (1 for treated, 0 for control) for each unit. x : ndarray A 2D array of covariates (including intercept if desired) with shape (n_units, n_features). i_weights : ndarray A 1D array of individual observation weights for each unit. n_bootstrap : int Number of bootstrap iterations. Default is 1000. trim_level : float Maximum propensity score value for control units to avoid extreme weights. Default is 0.995. random_state : int, RandomState instance or None Controls the random number generation for reproducibility. Returns ------- ndarray A 1D array of bootstrap ATT estimates with length n_bootstrap. See Also -------- aipw_did_panel : The underlying AIPW estimator for panel data. wols_panel : Weighted OLS for outcome regression in panel data. """ n_units = _validate_inputs({"delta_y": delta_y, "d": d, "i_weights": i_weights}, x, n_bootstrap, trim_level) rng = np.random.RandomState(random_state) bootstrap_estimates = np.zeros(n_bootstrap) for b in range(n_bootstrap): v = rng.exponential(scale=1.0, size=n_units) b_weights = i_weights * v try: ps_b = calculate_pscore_ipt(D=d, X=x, iw=b_weights) except (ValueError, np.linalg.LinAlgError, RuntimeError) as e: warnings.warn(f"Propensity score estimation (IPT) failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue ps_b = np.clip(ps_b, 1e-6, 1 - 1e-6) trim_ps_mask = np.ones_like(ps_b, dtype=bool) control_mask = d == 0 trim_ps_mask[control_mask] = ps_b[control_mask] < trim_level b_weights_trimmed = b_weights.copy() b_weights_trimmed[~trim_ps_mask] = 0 try: out_reg_results = wols_panel(delta_y=delta_y, d=d, x=x, ps=ps_b, i_weights=b_weights) out_reg_b = out_reg_results.out_reg except (ValueError, np.linalg.LinAlgError, KeyError) as e: warnings.warn(f"Outcome regression failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue try: att_b = aipw_did_panel( delta_y=delta_y, d=d, ps=ps_b, out_reg=out_reg_b, i_weights=b_weights_trimmed, ) bootstrap_estimates[b] = att_b except (ValueError, ZeroDivisionError, RuntimeWarning) as e: warnings.warn(f"AIPW computation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan n_failed = np.sum(np.isnan(bootstrap_estimates)) if n_failed > 0: warnings.warn( f"{n_failed} out of {n_bootstrap} bootstrap iterations failed and resulted in NaN. " "This might be due to issues in propensity score estimation, outcome regression, " "or the AIPW calculation itself (e.g. perfect prediction, collinearity, " "small effective sample sizes after weighting/trimming).", UserWarning, ) return bootstrap_estimates
[docs] def wboot_ipw_panel(delta_y, d, x, i_weights, n_bootstrap=1000, trim_level=0.995, random_state=None): r"""Compute bootstrap estimates for IPW DiD with panel data. Implements a bootstrapped Inverse Probability Weighting (IPW) difference-in-differences estimator for panel data. Unlike doubly-robust methods, this uses only propensity scores without outcome regression. Parameters ---------- delta_y : ndarray A 1D array representing the difference in outcomes between the post-treatment and pre-treatment periods (Y_post - Y_pre) for each unit. d : ndarray A 1D array representing the treatment indicator (1 for treated, 0 for control) for each unit. x : ndarray A 2D array of covariates (including intercept if desired) with shape (n_units, n_features). i_weights : ndarray A 1D array of individual observation weights for each unit. n_bootstrap : int Number of bootstrap iterations. Default is 1000. trim_level : float Maximum propensity score value for control units to avoid extreme weights. Default is 0.995. random_state : int, RandomState instance or None Controls the random number generation for reproducibility. Returns ------- ndarray A 1D array of bootstrap ATT estimates with length n_bootstrap. See Also -------- wboot_drdid_imp_panel : Improved doubly-robust bootstrap for panel data. wboot_dr_tr_panel : Traditional doubly-robust bootstrap for panel data. """ n_units = _validate_inputs({"delta_y": delta_y, "d": d, "i_weights": i_weights}, x, n_bootstrap, trim_level) if len(np.unique(d)) < 2: warnings.warn("Treatment indicator `d` has no variation. Cannot compute IPW. Returning NaNs.", UserWarning) return np.full(n_bootstrap, np.nan) rng = np.random.RandomState(random_state) bootstrap_estimates = np.zeros(n_bootstrap) for b in range(n_bootstrap): v = rng.exponential(scale=1.0, size=n_units) b_weights = i_weights * v try: ps_b = _fit_logistic_boot(d, x, b_weights) except (ValueError, np.linalg.LinAlgError) as e: warnings.warn(f"Propensity score estimation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue ps_b = np.clip(ps_b, 1e-6, 1 - 1e-6) trim_ps_mask = np.ones_like(ps_b, dtype=bool) control_mask = d == 0 trim_ps_mask[control_mask] = ps_b[control_mask] < trim_level b_weights_trimmed = b_weights.copy() b_weights_trimmed[~trim_ps_mask] = 0 try: ipw_weights = d - ps_b * (1 - d) / (1 - ps_b) numerator = np.sum(b_weights_trimmed * trim_ps_mask * ipw_weights * delta_y) denominator = np.sum(b_weights_trimmed * d) if denominator == 0: warnings.warn(f"No effectively treated units in bootstrap {b}. ATT will be NaN.", UserWarning) bootstrap_estimates[b] = np.nan else: att_b = numerator / denominator bootstrap_estimates[b] = att_b except (ValueError, ZeroDivisionError, RuntimeWarning) as e: warnings.warn(f"IPW computation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan n_failed = np.sum(np.isnan(bootstrap_estimates)) if n_failed > 0: warnings.warn( f"{n_failed} out of {n_bootstrap} bootstrap iterations failed and resulted in NaN. " "This might be due to issues in propensity score estimation or IPW calculation " "(e.g. perfect prediction, small effective sample sizes after trimming).", UserWarning, ) return bootstrap_estimates
[docs] def wboot_std_ipw_panel(delta_y, d, x, i_weights, n_bootstrap=1000, trim_level=0.995, random_state=None): r"""Compute bootstrap estimates for standardized IPW DiD with panel data. Implements a bootstrapped standardized Inverse Probability Weighting (IPW) difference-in-differences estimator for panel data. This estimator uses standardized weights to compute separate means for treated and control groups. Parameters ---------- delta_y : ndarray A 1D array representing the difference in outcomes between the post-treatment and pre-treatment periods (Y_post - Y_pre) for each unit. d : ndarray A 1D array representing the treatment indicator (1 for treated, 0 for control) for each unit. x : ndarray A 2D array of covariates (including intercept if desired) with shape (n_units, n_features). i_weights : ndarray A 1D array of individual observation weights for each unit. n_bootstrap : int Number of bootstrap iterations. Default is 1000. trim_level : float Maximum propensity score value for control units to avoid extreme weights. Default is 0.995. random_state : int, RandomState instance or None Controls the random number generation for reproducibility. Returns ------- ndarray A 1D array of bootstrap ATT estimates with length n_bootstrap. See Also -------- wboot_ipw_panel : Non-standardized IPW bootstrap for panel data. calculate_pscore_ipt : IPT propensity score estimation. """ n_units = _validate_inputs({"delta_y": delta_y, "d": d, "i_weights": i_weights}, x, n_bootstrap, trim_level) rng = np.random.RandomState(random_state) bootstrap_estimates = np.zeros(n_bootstrap) for b in range(n_bootstrap): v = rng.exponential(scale=1.0, size=n_units) b_weights = i_weights * v try: ps_b = _fit_logistic_boot(d, x, b_weights) except (ValueError, np.linalg.LinAlgError) as e: warnings.warn(f"Propensity score estimation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue ps_b = np.clip(ps_b, 1e-6, 1 - 1e-6) trim_ps_mask = np.ones_like(ps_b, dtype=bool) control_mask = d == 0 trim_ps_mask[control_mask] = ps_b[control_mask] < trim_level w_treat_b = trim_ps_mask * b_weights * d w_cont_b = trim_ps_mask * b_weights * (1 - d) * ps_b / (1 - ps_b) try: mean_w_treat = np.mean(w_treat_b) mean_w_cont = np.mean(w_cont_b) if mean_w_treat == 0: warnings.warn(f"No effectively treated units in bootstrap {b}. ATT will be NaN.", UserWarning) bootstrap_estimates[b] = np.nan continue if mean_w_cont == 0: warnings.warn(f"No effectively control units in bootstrap {b}. ATT will be NaN.", UserWarning) bootstrap_estimates[b] = np.nan continue aipw_1 = np.mean(w_treat_b * delta_y) / mean_w_treat aipw_0 = np.mean(w_cont_b * delta_y) / mean_w_cont att_b = aipw_1 - aipw_0 bootstrap_estimates[b] = att_b except (ValueError, ZeroDivisionError, RuntimeWarning) as e: warnings.warn(f"Standardized IPW computation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan n_failed = np.sum(np.isnan(bootstrap_estimates)) if n_failed > 0: warnings.warn( f"{n_failed} out of {n_bootstrap} bootstrap iterations failed and resulted in NaN. " "This might be due to issues in propensity score estimation or standardized IPW calculation " "(e.g. perfect prediction, small effective sample sizes after trimming).", UserWarning, ) return bootstrap_estimates
[docs] def wboot_dr_tr_panel(delta_y, d, x, i_weights, n_bootstrap=1000, trim_level=0.995, random_state=None): r"""Compute bootstrap estimates for traditional doubly-robust DiD with panel data. Implements a traditional bootstrap approach for doubly-robust difference-in-differences with panel data that uses standard logistic regression for propensity score estimation. Parameters ---------- delta_y : ndarray A 1D array representing the difference in outcomes between the post-treatment and pre-treatment periods (Y_post - Y_pre) for each unit. d : ndarray A 1D array representing the treatment indicator (1 for treated, 0 for control) for each unit. x : ndarray A 2D array of covariates (including intercept if desired) with shape (n_units, n_features). i_weights : ndarray A 1D array of individual observation weights for each unit. n_bootstrap : int Number of bootstrap iterations. Default is 1000. trim_level : float Maximum propensity score value for control units to avoid extreme weights. Default is 0.995. random_state : int, RandomState instance or None Controls the random number generation for reproducibility. Returns ------- ndarray A 1D array of bootstrap ATT estimates with length n_bootstrap. See Also -------- wboot_drdid_imp_panel : Improved bootstrap using IPT propensity scores. aipw_did_panel : The underlying AIPW estimator for panel data. """ n_units = _validate_inputs({"delta_y": delta_y, "d": d, "i_weights": i_weights}, x, n_bootstrap, trim_level) rng = np.random.RandomState(random_state) bootstrap_estimates = np.zeros(n_bootstrap) for b in range(n_bootstrap): v = rng.exponential(scale=1.0, size=n_units) b_weights = i_weights * v try: ps_b = _fit_logistic_boot(d, x, b_weights) except (ValueError, np.linalg.LinAlgError) as e: warnings.warn(f"Propensity score estimation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue ps_b = np.clip(ps_b, 1e-6, 1 - 1e-6) trim_ps_mask = np.ones_like(ps_b, dtype=bool) control_mask = d == 0 trim_ps_mask[control_mask] = ps_b[control_mask] < trim_level b_weights_trimmed = b_weights.copy() b_weights_trimmed[~trim_ps_mask] = 0 try: out_reg_results = wols_panel(delta_y=delta_y, d=d, x=x, ps=ps_b, i_weights=b_weights) out_reg_b = out_reg_results.out_reg except (ValueError, np.linalg.LinAlgError, KeyError) as e: warnings.warn(f"Outcome regression failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue try: att_b = aipw_did_panel( delta_y=delta_y, d=d, ps=ps_b, out_reg=out_reg_b, i_weights=b_weights_trimmed, ) bootstrap_estimates[b] = att_b except (ValueError, ZeroDivisionError, RuntimeWarning) as e: warnings.warn(f"AIPW computation failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan n_failed = np.sum(np.isnan(bootstrap_estimates)) if n_failed > 0: warnings.warn( f"{n_failed} out of {n_bootstrap} bootstrap iterations failed and resulted in NaN. " "This might be due to issues in propensity score estimation, outcome regression, " "or the AIPW calculation itself (e.g. perfect prediction, collinearity, " "small effective sample sizes after weighting/trimming).", UserWarning, ) return bootstrap_estimates
[docs] def wboot_reg_panel(delta_y, d, x, i_weights, n_bootstrap=1000, random_state=None): r"""Compute bootstrap estimates for regression-based robust DiD with panel data. This function implements a regression-based difference-in-differences estimator that uses outcome regression on the control group only, without propensity scores. It is designed for settings with 2 time periods and 2 groups. Parameters ---------- delta_y : ndarray A 1D array representing the difference in outcomes between the post-treatment and pre-treatment periods (Y_post - Y_pre) for each unit. d : ndarray A 1D array representing the treatment indicator (1 for treated, 0 for control) for each unit. x : ndarray A 2D array of covariates (including intercept if desired) with shape (n_units, n_features). i_weights : ndarray A 1D array of individual observation weights for each unit. n_bootstrap : int Number of bootstrap iterations. Default is 1000. random_state : int, RandomState instance or None Controls the random number generation for reproducibility. Returns ------- ndarray A 1D array of bootstrap ATT estimates with length n_bootstrap. See Also -------- wboot_drdid_imp_panel : Doubly-robust bootstrap with propensity scores. wboot_ipw_panel : IPW bootstrap without outcome regression. """ n_units = _validate_inputs({"delta_y": delta_y, "d": d, "i_weights": i_weights}, x, n_bootstrap, trim_level=0.5) rng = np.random.RandomState(random_state) bootstrap_estimates = np.zeros(n_bootstrap) for b in range(n_bootstrap): v = rng.exponential(scale=1.0, size=n_units) b_weights = i_weights * v control_mask = d == 0 n_control = np.sum(control_mask) if n_control < x.shape[1]: warnings.warn(f"Insufficient control units ({n_control}) for regression in bootstrap {b}.", UserWarning) bootstrap_estimates[b] = np.nan continue try: x_control = x[control_mask] y_control = delta_y[control_mask] w_control = b_weights[control_mask] xp = get_backend() if xp is not np: reg_coeff, _ = cupy_wls( xp.asarray(y_control, dtype=xp.float64), xp.asarray(x_control, dtype=xp.float64), xp.asarray(w_control, dtype=xp.float64), ) reg_coeff = to_numpy(reg_coeff) else: glm_model = sm.GLM( y_control, x_control, family=sm.families.Gaussian(link=sm.families.links.Identity()), var_weights=w_control, ) glm_results = glm_model.fit() reg_coeff = glm_results.params out_reg_b = x @ reg_coeff except (np.linalg.LinAlgError, ValueError) as e: warnings.warn(f"Outcome regression failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan continue numerator = np.mean(b_weights * d * (delta_y - out_reg_b)) denominator = np.mean(b_weights * d) if denominator == 0: warnings.warn(f"No effectively treated units in bootstrap {b}. ATT will be NaN.", UserWarning) bootstrap_estimates[b] = np.nan else: att_b = numerator / denominator bootstrap_estimates[b] = att_b n_failed = np.sum(np.isnan(bootstrap_estimates)) if n_failed > 0: warnings.warn( f"{n_failed} out of {n_bootstrap} bootstrap iterations failed and resulted in NaN. " "This might be due to insufficient control units, collinearity in covariates, " "or lack of treated units in bootstrap samples.", UserWarning, ) return bootstrap_estimates
[docs] def wboot_twfe_panel(y, d, post, x, i_weights, n_bootstrap=1000, random_state=None): r"""Compute bootstrap estimates for Two-Way Fixed Effects DiD with panel data. This function implements a bootstrapped Two-Way Fixed Effects (TWFE) difference-in-differences estimator for panel data with 2 periods and 2 groups. This is the traditional DiD regression approach using OLS with treatment-period interaction. Parameters ---------- y : ndarray A 1D array representing the outcome variable for each unit-time observation. Should be stacked with pre-period observations followed by post-period observations. d : ndarray A 1D array representing the treatment indicator (1 for treated, 0 for control) for each unit-time observation. post : ndarray A 1D array representing the post-treatment period indicator (1 for post, 0 for pre) for each unit-time observation. x : ndarray A 2D array of covariates (including intercept if desired) with shape (n_observations, n_features). Should be stacked to match y, d, and post. i_weights : ndarray A 1D array of individual observation weights for each unit-time observation. n_bootstrap : int Number of bootstrap iterations. Default is 1000. random_state : int, RandomState instance or None Controls the random number generation for reproducibility. Returns ------- ndarray A 1D array of bootstrap ATT estimates with length n_bootstrap. See Also -------- wboot_reg_panel : Regression-based bootstrap for DiD with panel data. wboot_drdid_imp_panel : Doubly-robust bootstrap for DiD with panel data. """ n_obs = _validate_inputs({"y": y, "d": d, "post": post, "i_weights": i_weights}, x, n_bootstrap, trim_level=0.5) if n_obs % 2 != 0: raise ValueError("Number of observations must be even for balanced panel data.") n_units = n_obs // 2 rng = np.random.RandomState(random_state) bootstrap_estimates = np.zeros(n_bootstrap) for b in range(n_bootstrap): v = rng.exponential(scale=1.0, size=n_units) v = np.repeat(v, 2) b_weights = i_weights * v has_intercept = np.all(x[:, 0] == 1.0) if x.shape[1] > 0 else False if has_intercept: design_matrix = np.column_stack([x, d, post, d * post]) interaction_idx = x.shape[1] + 2 else: design_matrix = np.column_stack([np.ones(n_obs), d, post, d * post, x]) interaction_idx = 3 try: xp = get_backend() if xp is not np: beta, _ = cupy_wls( xp.asarray(y, dtype=xp.float64), xp.asarray(design_matrix, dtype=xp.float64), xp.asarray(b_weights, dtype=xp.float64), ) att_b = float(to_numpy(beta)[interaction_idx]) else: wls_model = sm.WLS(y, design_matrix, weights=b_weights) wls_results = wls_model.fit() att_b = wls_results.params[interaction_idx] bootstrap_estimates[b] = att_b except (np.linalg.LinAlgError, ValueError) as e: warnings.warn(f"TWFE regression failed in bootstrap {b}: {e}", UserWarning) bootstrap_estimates[b] = np.nan n_failed = np.sum(np.isnan(bootstrap_estimates)) if n_failed > 0: warnings.warn( f"{n_failed} out of {n_bootstrap} bootstrap iterations failed and resulted in NaN. " "This might be due to collinearity in the design matrix or numerical instability.", UserWarning, ) return bootstrap_estimates
def _fit_logistic_boot(d, x, b_weights): """Dispatch logistic regression for bootstrap to GPU or statsmodels.""" xp = get_backend() if xp is not np: _, ps = cupy_logistic_irls( xp.asarray(d, dtype=xp.float64), xp.asarray(x, dtype=xp.float64), xp.asarray(b_weights, dtype=xp.float64), ) return to_numpy(ps) logit_model = sm.GLM(d, x, family=sm.families.Binomial(), freq_weights=b_weights) logit_results = logit_model.fit() return logit_results.predict(x)