"""Propensity score estimation using Inverse Probability Tilting (IPT)."""
import warnings
import numpy as np
import scipy.linalg
import scipy.optimize
import scipy.special
import statsmodels.api as sm
from moderndid.cupy.backend import get_backend, to_numpy
from moderndid.cupy.regression import cupy_logistic_irls
[docs]
def calculate_pscore_ipt(D, X, iw, quantiles=None):
r"""Calculate propensity scores using Inverse Probability Tilting for the ATT.
Implements a specific variant of IPT tailored for estimating the
Average Treatment Effect on the Treated (ATT). Instead of re-weighting both
the treated and control groups to match the full sample, it estimates a
propensity score model that implies a re-weighting of the control group to
match the covariate distribution of the treated group. This is achieved by
solving the following optimization problem for the propensity score
parameters :math:`\gamma` given by
.. math::
\widehat{\gamma}^{ipt} = \arg\max_{\gamma \in \Gamma} \mathbb{E}_{n}
\left[D X^{\prime} \gamma - (1-D) \exp(X^{\prime} \gamma)\right].
The first-order condition of this problem implies the balancing property
.. math::
\sum_{i: D_i=1} w_i X_i = \sum_{i: D_i=0} w_i
\frac{\widehat{p}(X_i)}{1-\widehat{p}(X_i)} X_i,
where :math:`\widehat{p}(X) = \text{expit}(X'\widehat{\gamma}^{ipt})` is the estimated
propensity score (i.e., the logistic function, :math:`\exp(v) / (1 + \exp(v))`)
and :math:`w_i` are the observation weights. This property ensures that the
weighted average of covariates in the control group matches the weighted
average in the treated group, which is a key condition for identifying the ATT.
Parameters
----------
D : ndarray
Treatment indicator (1D array).
X : ndarray
Covariate matrix (2D array, n_obs x n_features), must include intercept.
iw : ndarray
Individual weights (1D array).
quantiles : dict[int, list[float]] | None
Dict mapping column indices to quantiles (values between 0 and 1).
For example, {1: [0.25, 0.5, 0.75]} adds 25th, 50th, 75th percentiles
of the 2nd column as balance constraints. Default is None (no quantiles).
Returns
-------
ndarray
Propensity scores.
Notes
-----
The general IPT framework described in [1]_ for the ATE involves solving
two separate moment equations to find weights for the treated and control
groups that balance covariates with the full sample. These are given by
equations (8) and (11) in their paper
.. math::
\frac{1}{N} \sum_{i=1}^{N}\left\{\frac{D_{i}}
{G\left(t\left(X_{i}\right)^{\prime} \delta_{I P T}^{1}\right)}-1\right\}
t\left(X_{i}\right)=0
and
.. math::
\frac{1}{N} \sum_{i=1}^{N}\left\{\frac{1-D_{i}}
{1-G\left(t\left(X_{i}\right)^{\prime} \delta_{I P T}^{0}\right)}-1\right\}
t\left(X_{i}\right)=0.
This implementation, following [2]_, uses a single objective function tailored for
ATT estimation. The function attempts to solve this using a trust-constr optimizer,
falling back to a BFGS optimization of a modified loss function, and finally to a
standard logit model if the IPT optimizations fail.
References
----------
.. [1] Graham, B., Pinto, C., and Egel, D. (2012), "Inverse Probability Tilting for Moment
Condition Models with Missing Data," The Review of Economic Studies, 79(3), 1053-1079.
https://doi.org/10.1093/restud/rdr047
.. [2] Sant'Anna, P. H., and Zhao, J. (2020), "Inverse Probability Weighting with
Missing Data," Journal of the American Statistical Association, 115(530), 1542-1552.
https://doi.org/10.1080/01621459.2019.1635520
"""
if np.all(iw == 0):
warnings.warn(
"All individual weights are zero. Propensity scores will be uninformative.",
UserWarning,
)
return np.full_like(D, np.nan, dtype=float)
unique_d = np.unique(D)
if not np.array_equal(unique_d, [0, 1]):
warnings.warn(
f"Treatment indicator D contains values {unique_d}. Expected binary values [0, 1]. "
"Results may be unreliable.",
UserWarning,
)
X_processed, _ = _remove_collinear_columns(X)
if quantiles is not None:
X_processed = _add_quantile_constraints(X_processed, quantiles, iw)
n_obs, k_features = X_processed.shape
init_gamma = _get_initial_gamma(D, X_processed, iw, k_features)
# Try trust-constr optimization first
try:
opt_cal_results = scipy.optimize.minimize(
lambda g, d_arr, x_arr, iw_arr: _loss_ps_cal(g, d_arr, x_arr, iw_arr)[0],
init_gamma.astype(np.float64),
args=(D, X_processed, iw),
method="trust-constr",
jac=lambda g, d_arr, x_arr, iw_arr: _loss_ps_cal(g, d_arr, x_arr, iw_arr)[1],
hess=lambda g, d_arr, x_arr, iw_arr: _loss_ps_cal(g, d_arr, x_arr, iw_arr)[2],
options={"maxiter": 1000},
)
if opt_cal_results.success:
gamma_cal = opt_cal_results.x
else:
raise RuntimeError("trust-constr did not converge")
except (ValueError, np.linalg.LinAlgError, RuntimeError, OverflowError) as e:
warnings.warn(f"trust-constr optimization failed: {e}. Using IPT algorithm.", UserWarning)
# Try IPT optimization
try:
opt_ipt_results = scipy.optimize.minimize(
lambda g, d_arr, x_arr, iw_arr, n: _loss_ps_ipt(g, d_arr, x_arr, iw_arr, n)[0],
init_gamma.astype(np.float64),
args=(D, X_processed, iw, n_obs),
method="BFGS",
jac=lambda g, d_arr, x_arr, iw_arr, n: _loss_ps_ipt(g, d_arr, x_arr, iw_arr, n)[1],
options={"maxiter": 10000, "gtol": 1e-06},
)
if opt_ipt_results.success:
gamma_cal = opt_ipt_results.x
else:
raise RuntimeError("IPT optimization did not converge") from None
except (ValueError, np.linalg.LinAlgError, RuntimeError, OverflowError) as e_ipt:
warnings.warn(f"IPT optimization failed: {e_ipt}. Using initial logit estimates.", UserWarning)
gamma_cal = init_gamma
# Validate logit fallback
try:
logit_model_refit = sm.GLM(D, X_processed, family=sm.families.Binomial(), freq_weights=iw)
logit_results_refit = logit_model_refit.fit(start_params=init_gamma, maxiter=100)
if not logit_results_refit.converged:
warnings.warn("Initial Logit model (used as fallback) also did not converge.", UserWarning)
except (ValueError, np.linalg.LinAlgError, RuntimeError, OverflowError):
warnings.warn("Checking convergence of fallback Logit model failed.", UserWarning)
# Compute propensity scores
pscore_linear = X_processed @ gamma_cal
pscore = scipy.special.expit(pscore_linear)
if np.any(np.isnan(pscore)):
warnings.warn(
"Propensity score model coefficients might have NA/Inf components. "
"Multicollinearity or lack of variation in covariates is a likely reason. "
"Resulting pscores contain NaNs.",
UserWarning,
)
return pscore
def _loss_ps_cal(gamma, D, X, iw):
"""Loss function for calibrated propensity score estimation using trust.
Parameters
----------
gamma : ndarray
Coefficient vector for propensity score model.
D : ndarray
Treatment indicator (1D array).
X : ndarray
Covariate matrix (2D array, n_obs x n_features), includes intercept.
iw : ndarray
Individual weights (1D array).
Returns
-------
tuple
(value, gradient, hessian)
"""
n_obs, k_features = X.shape
if np.any(np.isnan(gamma)):
return np.inf, np.full(k_features, np.nan), np.full((k_features, k_features), np.nan)
ps_ind = X @ gamma
ps_ind_clipped = np.clip(ps_ind, -500, 500)
exp_ps_ind = np.exp(ps_ind_clipped)
value = -np.mean(np.where(D, ps_ind, -exp_ps_ind) * iw)
grad_terms = np.where(D[:, np.newaxis], 1.0, -exp_ps_ind[:, np.newaxis]) * iw[:, np.newaxis] * X
gradient = -np.mean(grad_terms, axis=0)
hess_M_vector = np.where(D, 0.0, -exp_ps_ind) * iw
hessian_term_matrix = X * hess_M_vector[:, np.newaxis]
hessian = -(X.T @ hessian_term_matrix) / n_obs
return value, gradient, hessian
def _loss_ps_ipt(gamma, D, X, iw, n_obs):
"""Loss function for inverse probability tilting propensity score estimation.
Parameters
----------
gamma : ndarray
Coefficient vector for propensity score model.
D : ndarray
Treatment indicator (1D array).
X : ndarray
Covariate matrix (2D array, n_obs x n_features), includes intercept.
iw : ndarray
Individual weights (1D array).
n_obs : int
Number of observations.
Returns
-------
tuple
(value, gradient, hessian)
"""
k_features = X.shape[1]
if np.any(np.isnan(gamma)):
return np.inf, np.full(k_features, np.nan), np.full((k_features, k_features), np.nan)
log_n_minus_1 = np.log(n_obs - 1)
cn = -(n_obs - 1)
bn = -n_obs + (n_obs - 1) * log_n_minus_1
an = -(n_obs - 1) * (1 - log_n_minus_1 + 0.5 * (log_n_minus_1**2))
v_star = log_n_minus_1
v = X @ gamma
v_clipped = np.clip(v, -500, 500)
phi = np.where(v < v_star, -v - np.exp(v_clipped), an + bn * v + 0.5 * cn * (v**2))
phi1 = np.where(v < v_star, -1.0 - np.exp(v_clipped), bn + cn * v)
phi2 = np.where(v < v_star, -np.exp(v_clipped), cn)
value = -np.sum(iw * (1 - D) * phi + v)
grad_vec_term = iw * (1 - D) * phi1 + 1.0
gradient = -(X.T @ grad_vec_term)
hess_M_ipt_vector = (1 - D) * iw * phi2
hessian_term_matrix = X * hess_M_ipt_vector[:, np.newaxis]
hessian = -(hessian_term_matrix.T @ X)
return value, gradient, hessian
def _get_initial_gamma(D, X, iw, k_features):
"""Get initial gamma values for optimization."""
xp = get_backend()
if xp is not np:
try:
beta, _ = cupy_logistic_irls(
xp.asarray(D, dtype=xp.float64),
xp.asarray(X, dtype=xp.float64),
xp.asarray(iw, dtype=xp.float64),
)
return to_numpy(beta)
except (RuntimeError, np.linalg.LinAlgError):
return np.zeros(k_features)
try:
logit_model = sm.GLM(D, X, family=sm.families.Binomial(), freq_weights=iw)
logit_results = logit_model.fit(maxiter=100)
init_gamma = logit_results.params
if not logit_results.converged:
warnings.warn(
"Initial Logit model for IPT did not converge. Using pseudo-inverse for initial gamma.", UserWarning
)
try:
init_gamma = np.linalg.pinv(X.T @ (iw[:, np.newaxis] * X)) @ (X.T @ (iw * D))
except np.linalg.LinAlgError:
warnings.warn("Pseudo-inverse for initial gamma failed. Using zeros.", UserWarning)
init_gamma = np.zeros(k_features)
return init_gamma
except (np.linalg.LinAlgError, ValueError) as e:
warnings.warn(f"Initial Logit model failed: {e}. Using zeros for initial gamma.", UserWarning)
return np.zeros(k_features)
def _remove_collinear_columns(X):
"""Remove collinear columns from covariate matrix."""
n_obs, n_cols = X.shape
if n_cols == 1:
return X, []
_, R, P = scipy.linalg.qr(X, mode="economic", pivoting=True)
diag_R = np.abs(np.diag(R))
tol = diag_R[0] * max(n_obs, n_cols) * np.finfo(float).eps
rank = np.sum(diag_R > tol)
if rank == n_cols:
return X, []
independent_cols = P[:rank]
removed_cols = P[rank:]
if len(removed_cols) > 0:
warnings.warn(
f"Removed {len(removed_cols)} collinear column(s) from covariate matrix. "
f"Column indices removed: {removed_cols.tolist()}",
UserWarning,
)
return X[:, independent_cols], removed_cols.tolist()
def _add_quantile_constraints(X, quantiles, iw):
"""Add quantile balance constraints to covariate matrix."""
quantile_features = []
for col_idx, q_list in quantiles.items():
if col_idx >= X.shape[1]:
warnings.warn(f"Column index {col_idx} exceeds number of columns ({X.shape[1]}). Skipping.", UserWarning)
continue
if col_idx == 0 and np.all(X[:, 0] == X[0, 0]):
warnings.warn("Skipping quantile constraints for intercept column.", UserWarning)
continue
col_values = X[:, col_idx]
for q in q_list:
if not 0 < q < 1:
warnings.warn(f"Quantile {q} must be between 0 and 1. Skipping.", UserWarning)
continue
threshold = _weighted_quantile(col_values, q, iw)
q_indicator = (col_values <= threshold).astype(float)
quantile_features.append(q_indicator)
if quantile_features:
X_extended = np.column_stack([X, *quantile_features])
return X_extended
return X
def _weighted_quantile(values, q, weights):
"""Compute weighted quantile."""
sorted_idx = np.argsort(values)
sorted_values = values[sorted_idx]
sorted_weights = weights[sorted_idx]
# Cumulative weights
cum_weights = np.cumsum(sorted_weights)
total_weight = cum_weights[-1]
# Find quantile position
quantile_weight = q * total_weight
idx = np.searchsorted(cum_weights, quantile_weight)
if idx == 0:
return sorted_values[0]
if idx >= len(sorted_values):
return sorted_values[-1]
# Linear interpolation between adjacent values
w_below = cum_weights[idx - 1]
w_above = cum_weights[idx]
if w_above == w_below:
return sorted_values[idx]
# Interpolation factor
alpha = (quantile_weight - w_below) / (w_above - w_below)
return sorted_values[idx - 1] + alpha * (sorted_values[idx] - sorted_values[idx - 1])