Source code for pystatsbio.diagnostic._roc

"""ROC curve analysis with DeLong confidence intervals and comparison test.

Implements the empirical ROC curve, AUC via Mann-Whitney U, DeLong
standard errors and CIs (logit-transformed), and the DeLong test for
comparing two correlated ROC curves.

References
----------
DeLong, DeLong & Clarke-Pearson (1988). Comparing the areas under two
or more correlated receiver operating characteristic curves: a
nonparametric approach.  *Biometrics*, 44(3), 837-845.

Validates against: R ``pROC::roc()``, ``pROC::ci.auc()``,
``pROC::roc.test()``.
"""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np
from numpy.typing import NDArray
from scipy import stats

from pystatsbio.diagnostic._common import ROCResult


# ---------------------------------------------------------------------------
# ROCTestResult
# ---------------------------------------------------------------------------

[docs] @dataclass(frozen=True) class ROCTestResult: """Result of comparing two correlated ROC curves (DeLong test).""" statistic: float p_value: float auc1: float auc2: float auc_diff: float method: str # 'delong'
[docs] def summary(self) -> str: lines = [ "DeLong Test for Two Correlated ROC Curves", "=" * 45, f"AUC 1 : {self.auc1:.4f}", f"AUC 2 : {self.auc2:.4f}", f"Difference: {self.auc_diff:.4f}", f"Z : {self.statistic:.4f}", f"p-value : {self.p_value:.4g}", ] return "\n".join(lines)
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _validate_roc_inputs( response: NDArray, predictor: NDArray ) -> tuple[NDArray, NDArray]: """Validate and coerce inputs for ROC analysis.""" response = np.asarray(response, dtype=np.intp) predictor = np.asarray(predictor, dtype=np.float64) if response.ndim != 1 or predictor.ndim != 1: raise ValueError("response and predictor must be 1-D arrays") if response.shape[0] != predictor.shape[0]: raise ValueError( f"response and predictor must have the same length, " f"got {response.shape[0]} and {predictor.shape[0]}" ) unique_labels = np.unique(response) if not np.array_equal(unique_labels, np.array([0, 1])): raise ValueError( f"response must be binary (0/1), got unique values {unique_labels}" ) n1 = int(response.sum()) n0 = len(response) - n1 if n1 < 1 or n0 < 1: raise ValueError("Need at least one case and one control") return response, predictor def _resolve_direction( response: NDArray, predictor: NDArray, direction: str, ) -> str: """Choose direction if 'auto'.""" if direction == "auto": med_cases = np.median(predictor[response == 1]) med_controls = np.median(predictor[response == 0]) return "<" if med_controls <= med_cases else ">" if direction not in ("<", ">"): raise ValueError(f"direction must be '<', '>' or 'auto', got {direction!r}") return direction def _compute_auc_and_placements( response: NDArray, predictor: NDArray, direction: str, ) -> tuple[float, NDArray, NDArray]: """Compute AUC via ranks and DeLong placement values. Returns (auc, V10, V01) where V10 has shape (n1,) and V01 has shape (n0,). """ # If direction is '>', negate predictor so higher = positive if direction == ">": predictor = -predictor case_mask = response == 1 n1 = int(case_mask.sum()) n0 = len(response) - n1 # Pooled ranks (midrank for ties) pooled_ranks = stats.rankdata(predictor, method="average") # Ranks among cases only and controls only case_vals = predictor[case_mask] ctrl_vals = predictor[~case_mask] case_ranks_within = stats.rankdata(case_vals, method="average") ctrl_ranks_within = stats.rankdata(ctrl_vals, method="average") # AUC = (sum of case ranks in pooled - n1*(n1+1)/2) / (n1*n0) sum_case_ranks = pooled_ranks[case_mask].sum() auc = (sum_case_ranks - n1 * (n1 + 1) / 2) / (n1 * n0) # Placement values via rank difference # V10[i] = (pooled_rank_i - within_case_rank_i) / n0 V10 = (pooled_ranks[case_mask] - case_ranks_within) / n0 # V01[j] = 1 - (pooled_rank_j - within_ctrl_rank_j) / n1 # For controls, the "number of cases beaten" is: # V01[j] = (1 - direction-corrected placement) # Actually: V01[j] measures how many cases the control "loses" to. # V01[j] = 1 - (pooled_rank_j - within_ctrl_rank_j) / n1 V01 = 1.0 - (pooled_ranks[~case_mask] - ctrl_ranks_within) / n1 return float(auc), V10, V01 def _delong_variance( auc: float, V10: NDArray, V01: NDArray, n1: int, n0: int, ) -> float: """DeLong variance of AUC from placement values.""" S10 = np.var(V10, ddof=1) if n1 > 1 else 0.0 S01 = np.var(V01, ddof=1) if n0 > 1 else 0.0 return S10 / n1 + S01 / n0 def _logit_ci( auc: float, var_auc: float, conf_level: float, ) -> tuple[float, float]: """AUC confidence interval on logit scale (matching pROC default).""" z = stats.norm.ppf((1 + conf_level) / 2) # Clamp AUC away from 0/1 to avoid log(0) auc_c = np.clip(auc, 1e-10, 1.0 - 1e-10) logit_auc = np.log(auc_c / (1.0 - auc_c)) se_logit = np.sqrt(var_auc) / (auc_c * (1.0 - auc_c)) logit_lo = logit_auc - z * se_logit logit_hi = logit_auc + z * se_logit ci_lo = 1.0 / (1.0 + np.exp(-logit_lo)) ci_hi = 1.0 / (1.0 + np.exp(-logit_hi)) return float(ci_lo), float(ci_hi) # --------------------------------------------------------------------------- # Public API — roc() # ---------------------------------------------------------------------------
[docs] def roc( response: NDArray[np.integer], predictor: NDArray[np.floating], *, direction: str = "auto", conf_level: float = 0.95, ) -> ROCResult: """Compute empirical ROC curve with DeLong AUC confidence interval. Parameters ---------- response : array of int Binary outcome (0/1). predictor : array of float Continuous predictor (biomarker value). direction : str ``'<'`` (controls < cases, higher predictor → positive), ``'>'`` (controls > cases, lower predictor → positive), or ``'auto'`` (choose direction giving AUC ≥ 0.5). conf_level : float Confidence level for AUC CI. Returns ------- ROCResult Validates against: R ``pROC::roc()``, ``pROC::ci.auc()`` """ if not 0 < conf_level < 1: raise ValueError(f"conf_level must be in (0, 1), got {conf_level}") response, predictor = _validate_roc_inputs(response, predictor) direction = _resolve_direction(response, predictor, direction) n1 = int(response.sum()) n0 = len(response) - n1 # AUC and placement values auc_val, V10, V01 = _compute_auc_and_placements( response, predictor, direction, ) # DeLong SE and CI var_auc = _delong_variance(auc_val, V10, V01, n1, n0) se_auc = float(np.sqrt(var_auc)) ci_lo, ci_hi = _logit_ci(auc_val, var_auc, conf_level) # Build empirical ROC curve (thresholds, TPR, FPR) thresholds, tpr, fpr = _empirical_roc_curve( response, predictor, direction, ) return ROCResult( thresholds=thresholds, tpr=tpr, fpr=fpr, auc=auc_val, auc_se=se_auc, auc_ci_lower=ci_lo, auc_ci_upper=ci_hi, conf_level=conf_level, n_positive=n1, n_negative=n0, direction=direction, )
def _empirical_roc_curve( response: NDArray, predictor: NDArray, direction: str, ) -> tuple[NDArray, NDArray, NDArray]: """Compute empirical ROC curve points. Returns (thresholds, tpr, fpr) sorted from (0,0) to (1,1). """ case_mask = response == 1 n1 = int(case_mask.sum()) n0 = len(response) - n1 # Unique thresholds sorted descending (for direction '<') unique_vals = np.unique(predictor) if direction == "<": # Higher predictor = positive: classify positive if predictor >= c # Sort thresholds from high to low sorted_thresh = np.sort(unique_vals)[::-1] tpr_list = [] fpr_list = [] for c in sorted_thresh: tpr_list.append(np.sum(predictor[case_mask] >= c) / n1) fpr_list.append(np.sum(predictor[~case_mask] >= c) / n0) # Prepend (0, 0) — at threshold above max, nobody classified positive # Append (1, 1) — at threshold below min, everybody classified positive thresholds = np.concatenate([ [np.inf], sorted_thresh, [-np.inf], ]) tpr_arr = np.array([0.0] + tpr_list + [1.0]) fpr_arr = np.array([0.0] + fpr_list + [1.0]) else: # Lower predictor = positive: classify positive if predictor <= c sorted_thresh = np.sort(unique_vals) tpr_list = [] fpr_list = [] for c in sorted_thresh: tpr_list.append(np.sum(predictor[case_mask] <= c) / n1) fpr_list.append(np.sum(predictor[~case_mask] <= c) / n0) thresholds = np.concatenate([ [-np.inf], sorted_thresh, [np.inf], ]) tpr_arr = np.array([0.0] + tpr_list + [1.0]) fpr_arr = np.array([0.0] + fpr_list + [1.0]) return thresholds, tpr_arr, fpr_arr # --------------------------------------------------------------------------- # Public API — roc_test() # ---------------------------------------------------------------------------
[docs] def roc_test( roc1: ROCResult, roc2: ROCResult, *, predictor1: NDArray[np.floating] | None = None, predictor2: NDArray[np.floating] | None = None, response: NDArray[np.integer] | None = None, method: str = "delong", ) -> ROCTestResult: """Compare two correlated ROC curves using DeLong's test. The two ROC curves must be computed on the **same** subjects (same response vector). The original predictor values and shared response are required to compute the paired DeLong covariance. Parameters ---------- roc1, roc2 : ROCResult Two ROC curves computed on the same subjects. predictor1, predictor2 : array of float Original predictor values for each marker. response : array of int Shared binary outcome. method : str ``'delong'`` (only supported method). Returns ------- ROCTestResult Validates against: R ``pROC::roc.test()`` """ if method != "delong": raise ValueError(f"Only 'delong' method is supported, got {method!r}") if predictor1 is None or predictor2 is None or response is None: raise ValueError( "predictor1, predictor2, and response are required for DeLong test" ) response = np.asarray(response, dtype=np.intp) predictor1 = np.asarray(predictor1, dtype=np.float64) predictor2 = np.asarray(predictor2, dtype=np.float64) n = len(response) if predictor1.shape[0] != n or predictor2.shape[0] != n: raise ValueError("predictor1, predictor2, and response must have equal length") n1 = int(response.sum()) n0 = n - n1 # Compute placement values for each marker auc1, V10_1, V01_1 = _compute_auc_and_placements( response, predictor1, roc1.direction, ) auc2, V10_2, V01_2 = _compute_auc_and_placements( response, predictor2, roc2.direction, ) # Variance of each AUC var1 = _delong_variance(auc1, V10_1, V01_1, n1, n0) var2 = _delong_variance(auc2, V10_2, V01_2, n1, n0) # Covariance between the two AUCs S10_12 = np.cov(V10_1, V10_2, ddof=1)[0, 1] if n1 > 1 else 0.0 S01_12 = np.cov(V01_1, V01_2, ddof=1)[0, 1] if n0 > 1 else 0.0 cov_12 = S10_12 / n1 + S01_12 / n0 # Variance of the difference var_diff = var1 + var2 - 2 * cov_12 var_diff = max(var_diff, 1e-20) # avoid division by zero # Z statistic z_stat = (auc1 - auc2) / np.sqrt(var_diff) p_value = 2 * stats.norm.sf(abs(z_stat)) return ROCTestResult( statistic=float(z_stat), p_value=float(p_value), auc1=auc1, auc2=auc2, auc_diff=auc1 - auc2, method="delong", )