Source code for moderndid.npiv.lepski

"""Lepski method for optimal dimension selection in nonparametric instrumental variables."""

import numpy as np

from ..cupy.backend import get_backend, to_device, to_numpy
from .estimators import _ginv, npiv_est
from .prodspline import prodspline
from .utils import _quantile_basis, avoid_zero_division, basis_dimension, matrix_sqrt


[docs] def npiv_j( y, x, w, x_grid=None, j_x_degree=3, k_w_degree=4, j_x_segments_set=None, k_w_segments_set=None, knots="uniform", basis="tensor", x_min=None, x_max=None, w_min=None, w_max=None, grid_num=50, biters=99, alpha=0.5, check_is_fullrank=False, seed=None, ): r"""Implement Lepski's method for optimal sieve dimension selection. Implements the bootstrap-based test from [1]_ for selecting the optimal number of B-spline basis functions in nonparametric instrumental variables (NPIV) estimation. The method compares estimates across a grid of sieve dimensions :math:`\hat{\mathcal{J}}`. For each pair :math:`(J, J_2)` with :math:`J_2 > J`, it computes a sup-t-statistic for the difference in estimates .. math:: \sup_{x \in \mathcal{X}} \left| \frac{\hat{h}_J(x) - \hat{h}_{J_2}(x)}{\hat{\sigma}_{J, J_2}(x)} \right|. The optimal dimension :math:`\hat{J}` is the smallest :math:`J \in \hat{\mathcal{J}}` for which this statistic is below a bootstrap critical value :math:`\theta_{1-\hat{\alpha}}^*` for all :math:`J_2 > J`. The bootstrap critical value is the :math:`(1-\hat{\alpha})` quantile of the multiplier bootstrap process .. math:: \sup_{\left\{\left(x, J, J_{2}\right) \in \mathcal{X} \times \hat{\mathcal{J}} \times \hat{\mathcal{J}}: J_{2}>J\right\}} \left|\frac{D_{J}^{*}(x)-D_{J_{2}}^{*}(x)}{\hat{\sigma}_{J, J_{2}}(x)}\right|, where :math:`D_J^*(x) = (\psi^J(x))' \mathbf{M}_J \hat{\mathbf{u}}_J^*` is a multiplier bootstrap version of the estimation error, and :math:`\hat{\sigma}_{J, J_2}^2(x)` is the estimated variance of the difference in estimators. This procedure avoids the need to select tuning parameters for the test itself and performs well in practice. Parameters ---------- y : ndarray Dependent variable vector. x : ndarray Endogenous regressor matrix. w : ndarray Instrument matrix. x_grid : ndarray, optional Grid points for evaluation. If None, created automatically. j_x_degree : int, default=3 Degree of B-spline basis for :math:`X`. k_w_degree : int, default=4 Degree of B-spline basis for :math:`W`. j_x_segments_set : ndarray, optional Set of :math:`J` values to test. If None, uses [1, 3, 7, 15, 31, 63]. k_w_segments_set : ndarray, optional Set of :math:`K` values to test. If None, computed from :math:`J` values. knots : {"uniform", "quantiles"}, default="uniform" Knot placement method. basis : {"tensor", "additive", "glp"}, default="tensor" Type of basis. x_min, x_max, w_min, w_max : float, optional Range limits for basis construction. grid_num : int, default=50 Number of grid points for evaluation. biters : int, default=99 Number of bootstrap replications. alpha : float, default=0.5 Significance level for test. check_is_fullrank : bool, default=False Whether to check for full rank. seed : int, optional Random seed for reproducibility. Returns ------- dict Dictionary containing: - **j_tilde**: Selected J value - **j_hat**: Unadjusted Lepski choice - **j_hat_n**: Truncated value - **j_x_seg**: Final selected J segments - **k_w_seg**: Corresponding K segments - **theta_star**: Bootstrap critical value See Also -------- npiv_choose_j : Full data-driven selection procedure npiv_jhat_max : Compute maximum feasible dimension References ---------- .. [1] Chen, X., Christensen, T. M., & Kankanala, S. (2024). Adaptive Estimation and Uniform Confidence Bands for Nonparametric Structural Functions and Elasticities. https://arxiv.org/abs/2107.11869. """ xp = get_backend() y = xp.asarray(y).ravel() x = xp.atleast_2d(xp.asarray(x)) w = xp.atleast_2d(xp.asarray(w)) n = len(y) p_x = x.shape[1] p_w = w.shape[1] if j_x_segments_set is None: j_x_segments_set = np.array([2, 4, 8, 16, 32, 64]) if k_w_segments_set is None: k_w_segments_set = np.array([2, 4, 8, 16, 32, 64]) if x_grid is None: x_np = to_numpy(x) x_grid = np.zeros((grid_num, p_x)) for j in range(p_x): x_grid[:, j] = np.linspace(x_np[:, j].min(), x_np[:, j].max(), grid_num) x_grid = to_device(x_grid) j1_j2_pairs = [] for i, j1 in enumerate(j_x_segments_set): for j, j2 in enumerate(j_x_segments_set[i + 1 :], i + 1): j1_j2_pairs.append((i, j, j1, j2)) n_pairs = len(j1_j2_pairs) z_sup = np.zeros(n_pairs) z_sup_boot = np.zeros((biters, n_pairs)) rng = np.random.default_rng(seed) boot_draws_all_np = rng.normal(0, 1, (biters, n)) boot_draws_all = to_device(boot_draws_all_np) for pair_idx, (i, j, j1, j2) in enumerate(j1_j2_pairs): k1 = k_w_segments_set[i] k2 = k_w_segments_set[j] try: result_j1 = npiv_est( y=y, x=x, w=w, x_eval=x_grid, basis=basis, j_x_degree=j_x_degree, j_x_segments=j1, k_w_degree=k_w_degree, k_w_segments=k1, knots=knots, check_is_fullrank=check_is_fullrank, w_min=w_min, w_max=w_max, x_min=x_min, x_max=x_max, ) result_j2 = npiv_est( y=y, x=x, w=w, x_eval=x_grid, basis=basis, j_x_degree=j_x_degree, j_x_segments=j2, k_w_degree=k_w_degree, k_w_segments=k2, knots=knots, check_is_fullrank=check_is_fullrank, w_min=w_min, w_max=w_max, x_min=x_min, x_max=x_max, ) # basis and influence matrices for j1 psi_x_j1_eval, tmp_j1 = _compute_basis_and_influence( x, w, x_grid, j1, k1, j_x_degree, k_w_degree, p_x, p_w, knots, basis, x_min, x_max, w_min, w_max ) # basis and influence matrices for j2 psi_x_j2_eval, tmp_j2 = _compute_basis_and_influence( x, w, x_grid, j2, k2, j_x_degree, k_w_degree, p_x, p_w, knots, basis, x_min, x_max, w_min, w_max ) u_j1 = result_j1.residuals u_j2 = result_j2.residuals z_sup[pair_idx], asy_se, (tmp_j1, tmp_j2, u_j1, u_j2) = _compute_test_statistic( result_j1, result_j2, psi_x_j1_eval, psi_x_j2_eval, tmp_j1, tmp_j2, u_j1, u_j2 ) z_sup_boot[:, pair_idx] = _bootstrap_comparison( psi_x_j1_eval, psi_x_j2_eval, tmp_j1, tmp_j2, u_j1, u_j2, asy_se, boot_draws_all, biters ) except (ValueError, np.linalg.LinAlgError): z_sup[pair_idx] = np.inf z_sup_boot[:, pair_idx] = np.inf z_boot_max = np.max(z_sup_boot, axis=1) theta_star = _quantile_basis(z_boot_max, 1 - alpha) # Lepski selection j_seg, test_mat = _select_optimal_dimension(z_sup, j1_j2_pairs, j_x_segments_set, theta_star) j_hat = basis_dimension( basis=basis, degree=np.full(p_x, j_x_degree), segments=np.full(p_x, j_seg), ) # truncated value (second-largest j) j_seg_n = j_x_segments_set[-2] if len(j_x_segments_set) > 1 else j_seg j_hat_n = basis_dimension( basis=basis, degree=np.full(p_x, j_x_degree), segments=np.full(p_x, j_seg_n), ) j_x_seg = min(j_seg, j_seg_n) j_tilde = min(j_hat, j_hat_n) k_w_seg = k_w_segments_set[np.where(j_x_segments_set == j_x_seg)[0][0]] return { "j_tilde": j_tilde, "j_hat": j_hat, "j_hat_n": j_hat_n, "j_x_seg": j_x_seg, "k_w_seg": k_w_seg, "theta_star": theta_star, "test_matrix": test_mat, "z_sup": z_sup, }
[docs] def npiv_jhat_max( x, w, j_x_degree=3, k_w_degree=4, k_w_smooth=2, knots="uniform", basis="tensor", x_min=None, x_max=None, w_min=None, w_max=None, ): r"""Determine the upper limit of the sieve dimension grid. Computes the maximum feasible number of B-spline basis functions, :math:`\hat{J}_{\max}`, based on the sample size and an estimate of the sieve measure of ill-posedness, :math:`\hat{s}_J`. This serves as the upper bound for the grid of dimensions searched over in the Lepski-style selection procedure. :math:`\hat{J}_{\max}` is defined as the largest :math:`J` in a dyadic grid :math:`\mathcal{T}` that satisfies .. math:: J \sqrt{\log J} \hat{s}_{J}^{-1} \leq c \sqrt{n} for a constant :math:`c` (here, 10). The term :math:`\hat{s}_J` is the smallest singular value of a matrix related to the instrumented basis functions, which captures the degree of ill-posedness. Parameters ---------- x : ndarray Endogenous regressor matrix. w : ndarray Instrument matrix. j_x_degree : int, default=3 Degree of B-spline basis for X. k_w_degree : int, default=4 Degree of B-spline basis for W. k_w_smooth : int, default=2 Smoothness parameter for K selection. knots : {"uniform", "quantiles"}, default="uniform" Knot placement method. basis : {"tensor", "additive", "glp"}, default="tensor" Type of basis. x_min, x_max, w_min, w_max : float, optional Range limits for basis construction. Returns ------- dict Dictionary containing: - **j_x_segments_set**: Array of J values to test - **k_w_segments_set**: Corresponding K values - **j_hat_max**: Maximum feasible dimension - **alpha_hat**: Recommended alpha for testing See Also -------- npiv_choose_j : Full data-driven selection procedure npiv_j : Lepski-style selection procedure References ---------- .. [1] Chen, X., Christensen, T. M., & Kankanala, S. (2024). Adaptive Estimation and Uniform Confidence Bands for Nonparametric Structural Functions and Elasticities. https://arxiv.org/abs/2107.11869. """ x = np.atleast_2d(x) w = np.atleast_2d(w) n = x.shape[0] p_x = x.shape[1] p_w = w.shape[1] is_regression = np.array_equal(x, w) L_max = max(int(np.floor(np.log(n) / np.log(2 * p_x))), 3) j_x_segments_set = 2 ** np.arange(L_max + 1) k_w_segments_set = 2 ** (np.arange(L_max + 1) + k_w_smooth) test_val = np.full(L_max + 1, np.nan) for i in range(L_max + 1): if i <= 1 or (i > 1 and test_val[i - 2] <= 10 * np.sqrt(n)): j_x_segments = j_x_segments_set[i] k_w_segments = k_w_segments_set[i] if is_regression: s_hat_j = max(1, (0.1 * np.log(n)) ** 4) else: s_hat_j = _compute_sieve_measure( x, w, j_x_segments, k_w_segments, j_x_degree, k_w_degree, p_x, p_w, knots, basis, x_min, x_max, w_min, w_max, n, ) j_x_dim = basis_dimension( basis=basis, degree=np.full(p_x, j_x_degree), segments=np.full(p_x, j_x_segments), ) test_val[i] = j_x_dim * np.sqrt(np.log(j_x_dim)) * max((0.1 * np.log(n)) ** 4, 1 / s_hat_j) elif i > 1: test_val[i] = test_val[i - 1] valid_idx = np.where((test_val[:-1] <= 10 * np.sqrt(n)) & (10 * np.sqrt(n) < test_val[1:]))[0] L_hat_max = valid_idx[0] + 1 if len(valid_idx) > 0 else L_max j_x_segments_set = j_x_segments_set[:L_hat_max] k_w_segments_set = k_w_segments_set[:L_hat_max] j_hat_max = basis_dimension( basis=basis, degree=np.full(p_x, j_x_degree), segments=np.full(p_x, j_x_segments_set[-1]), ) alpha_hat = min(0.5, np.sqrt(np.log(j_hat_max) / j_hat_max)) return { "j_x_segments_set": j_x_segments_set, "k_w_segments_set": k_w_segments_set, "j_hat_max": j_hat_max, "alpha_hat": alpha_hat, }
def _compute_sieve_measure( x, w, j_x_segments, k_w_segments, j_x_degree, k_w_degree, p_x, p_w, knots, basis, x_min, x_max, w_min, w_max, n ): """Compute sieve measure of ill-posedness.""" xp = get_backend() try: K_x = np.column_stack([np.full(p_x, j_x_degree), np.full(p_x, j_x_segments - 1)]) K_w = np.column_stack([np.full(p_w, k_w_degree), np.full(p_w, k_w_segments - 1)]) psi_x = prodspline( x=x, K=K_x, knots=knots, basis=basis, x_min=np.full(p_x, x_min) if x_min else None, x_max=np.full(p_x, x_max) if x_max else None, ).basis b_w = prodspline( x=w, K=K_w, knots=knots, basis=basis, x_min=np.full(p_w, w_min) if w_min else None, x_max=np.full(p_w, w_max) if w_max else None, ).basis psi_x_gram_sqrt = matrix_sqrt(_ginv(psi_x.T @ psi_x)) b_w_gram_sqrt = matrix_sqrt(_ginv(b_w.T @ b_w)) svd_matrix = psi_x_gram_sqrt @ (psi_x.T @ b_w) @ b_w_gram_sqrt s_val = float(xp.min(xp.linalg.svd(svd_matrix, compute_uv=False))) s_hat_j = s_val if s_val > 0 else max(1, (0.1 * np.log(n)) ** 4) except (ValueError, np.linalg.LinAlgError): s_hat_j = max(1, (0.1 * np.log(n)) ** 4) return s_hat_j def _compute_basis_and_influence( x, w, x_grid, j_seg, k_seg, j_x_degree, k_w_degree, p_x, p_w, knots, basis, x_min, x_max, w_min, w_max ): """Compute basis matrices and influence components for a given dimension pair.""" K_x = np.column_stack([np.full(p_x, j_x_degree), np.full(p_x, j_seg - 1)]) K_w = np.column_stack([np.full(p_w, k_w_degree), np.full(p_w, k_seg - 1)]) psi_x = prodspline( x=x, K=K_x, knots=knots, basis=basis, x_min=np.full(p_x, x_min) if x_min else None, x_max=np.full(p_x, x_max) if x_max else None, ).basis b_w = prodspline( x=w, K=K_w, knots=knots, basis=basis, x_min=np.full(p_w, w_min) if w_min else None, x_max=np.full(p_w, w_max) if w_max else None, ).basis psi_x_eval = prodspline( x=x, xeval=x_grid, K=K_x, knots=knots, basis=basis, x_min=np.full(p_x, x_min) if x_min else None, x_max=np.full(p_x, x_max) if x_max else None, ).basis # influence matrices btb_inv = _ginv(b_w.T @ b_w) design_matrix = psi_x.T @ b_w @ btb_inv @ b_w.T gram_inv = _ginv(design_matrix @ psi_x) tmp_matrix = gram_inv @ design_matrix return psi_x_eval, tmp_matrix def _compute_test_statistic(result_j1, result_j2, psi_x_eval_j1, psi_x_eval_j2, tmp_j1, tmp_j2, u_j1, u_j2): """Compute sup-t test statistic for comparing two estimators.""" xp = get_backend() # variance components D_j1_inv_rho = tmp_j1.T * u_j1[:, None] D_j1_var = D_j1_inv_rho.T @ D_j1_inv_rho var_j1 = xp.diag(psi_x_eval_j1 @ D_j1_var @ psi_x_eval_j1.T) D_j2_inv_rho = tmp_j2.T * u_j2[:, None] D_j2_var = D_j2_inv_rho.T @ D_j2_inv_rho var_j2 = xp.diag(psi_x_eval_j2 @ D_j2_var @ psi_x_eval_j2.T) # cross-covariance cov_j1_j2 = xp.diag(psi_x_eval_j1 @ (D_j1_inv_rho.T @ D_j2_inv_rho) @ psi_x_eval_j2.T) asy_var_diff = var_j1 + var_j2 - 2 * cov_j1_j2 asy_se = xp.sqrt(xp.maximum(asy_var_diff, 0)) # sup t-statistic diff = result_j1.h - result_j2.h z_sup = float(xp.max(xp.abs(diff) / avoid_zero_division(asy_se))) return z_sup, asy_se, (tmp_j1, tmp_j2, u_j1, u_j2) def _bootstrap_comparison(psi_x_eval_j1, psi_x_eval_j2, tmp_j1, tmp_j2, u_j1, u_j2, asy_se, boot_draws_all, biters): """Perform bootstrap test for dimension pair comparison.""" xp = get_backend() z_boot = np.zeros(biters) for b in range(biters): boot_draws = boot_draws_all[b] boot_diff_j1 = psi_x_eval_j1 @ (tmp_j1 @ (u_j1 * boot_draws)) boot_diff_j2 = psi_x_eval_j2 @ (tmp_j2 @ (u_j2 * boot_draws)) boot_diff = boot_diff_j1 - boot_diff_j2 z_boot[b] = float(xp.max(xp.abs(boot_diff) / avoid_zero_division(asy_se))) return z_boot def _select_optimal_dimension(z_sup, j1_j2_pairs, j_x_segments_set, theta_star): """Apply Lepski selection criterion to choose optimal dimension.""" num_j = len(j_x_segments_set) test_mat = np.full((num_j, num_j), np.nan) for pair_idx, (i, j, _, _) in enumerate(j1_j2_pairs): test_mat[i, j] = z_sup[pair_idx] <= 1.1 * theta_star # check which j values pass the test test_vec = np.zeros(num_j - 1) for i in range(num_j - 1): test_vec[i] = np.all(test_mat[i, (i + 1) : num_j] == 1) if np.any(test_vec == 1): j_seg = j_x_segments_set[np.where(test_vec == 1)[0][0]] elif np.all(test_vec == 0) or np.all(np.isnan(test_vec)): j_seg = j_x_segments_set[-1] else: j_seg = j_x_segments_set[0] return j_seg, test_mat