Source code for mercurial.utils.validation

"""Validation utilities: cross‑validation, regularization, early stopping."""

from copy import deepcopy
from typing import Any, Callable, Dict, List, Tuple

import numpy as np
from sklearn.model_selection import KFold


[docs] class CrossValidator: """ k‑fold cross‑validation for Atlas case simulations. Splits cases into training and validation folds, runs simulations, and returns performance metrics per fold. """ def __init__(self, n_splits: int = 5, shuffle: bool = True, random_seed: int = 42): self.n_splits = n_splits self.shuffle = shuffle self.random_seed = random_seed self.kf = KFold(n_splits=n_splits, shuffle=shuffle, random_state=random_seed)
[docs] def validate( self, case_functions: Dict[str, Callable], parameter_sets: List[Dict[str, Any]], metric: Callable[[Dict], float] = lambda x: 1.0, ) -> Dict[str, Any]: """ Perform cross‑validation. Parameters ---------- case_functions : dict Mapping case_name -> function that runs simulation and returns results dict. parameter_sets : list of dict Different parameter configurations to evaluate (e.g., different λ values). metric : callable Function to compute accuracy/score from simulation results. Returns ------- results : dict Contains fold metrics, mean scores, std devs per parameter set. """ case_names = list(case_functions.keys()) n_cases = len(case_names) if n_cases < self.n_splits: raise ValueError(f"Only {n_cases} cases, cannot do {self.n_splits}-fold CV") results = [] for param_idx, params in enumerate(parameter_sets): fold_scores = [] for train_idx, val_idx in self.kf.split(case_names): [case_names[i] for i in train_idx] val_cases = [case_names[i] for i in val_idx] # Train (calibrate) on training set – here we just run simulation with given params # In real calibration, you would optimize params on training set. # For demonstration, we evaluate on validation set directly. scores = [] for case in val_cases: try: sim_results = case_functions[case]() score = metric(sim_results) scores.append(score) except Exception as e: print(f"Error on {case}: {e}") scores.append(0.0) fold_scores.append(np.mean(scores)) results.append( { "params": params, "fold_scores": fold_scores, "mean_score": np.mean(fold_scores), "std_score": np.std(fold_scores), } ) return {"parameter_results": results, "n_folds": self.n_splits, "n_cases": n_cases}
[docs] class Regularization: """ L2 regularization (ridge) for free energy or loss function. Adds λ * ||θ||₂² to the objective. """ def __init__(self, lambda_reg: float = 0.01): self.lambda_reg = lambda_reg
[docs] def penalty(self, parameters: np.ndarray) -> float: """Compute L2 penalty: λ * Σ θ_i².""" return self.lambda_reg * np.sum(parameters**2)
[docs] def gradient(self, parameters: np.ndarray) -> np.ndarray: """Gradient of penalty: 2λ * θ.""" return 2 * self.lambda_reg * parameters
[docs] class EarlyStopping: """ Early stopping to prevent overfitting during iterative calibration. Stops when validation loss stops improving. """ def __init__(self, patience: int = 5, min_delta: float = 1e-4): self.patience = patience self.min_delta = min_delta self.best_loss = np.inf self.counter = 0 self.best_params = None
[docs] def step(self, current_loss: float, current_params: Any) -> bool: """ Update state. Returns True if training should continue, False if stop. """ if current_loss < self.best_loss - self.min_delta: self.best_loss = current_loss self.best_params = deepcopy(current_params) self.counter = 0 return True else: self.counter += 1 return self.counter < self.patience
[docs] def reset(self): self.best_loss = np.inf self.counter = 0 self.best_params = None
[docs] def bootstrap_confidence_intervals( scores: List[float], n_resamples: int = 1000, ci: float = 0.95 ) -> Tuple[float, float, float]: """ Compute bootstrap CI for mean of scores. Returns (mean, lower, upper). """ rng = np.random.default_rng() means = [] n = len(scores) for _ in range(n_resamples): resample = rng.choice(scores, size=n, replace=True) means.append(np.mean(resample)) mean = np.mean(means) lower = np.percentile(means, (1 - ci) / 2 * 100) upper = np.percentile(means, (1 + ci) / 2 * 100) return mean, lower, upper
# ============================================================================ # Permutation Tests (Priority 7.7) # ============================================================================
[docs] def permutation_test( actual_scores: List[float], shuffled_scores: List[List[float]], n_permutations: int = 1000 ) -> Dict[str, float]: """ Perform a permutation test to assess statistical significance. Parameters ---------- actual_scores : list of float Model accuracy scores on the original data (e.g., cross‑validation folds). shuffled_scores : list of list of float For each permutation, a list of scores (same length as actual_scores) obtained by shuffling the relationship between inputs and outputs. n_permutations : int Number of permutations performed. Returns ------- dict Contains 'p_value' (two‑tailed), 'mean_shuffled', 'std_shuffled', 'original_mean', and 'is_significant' (True if p < 0.05). """ original_mean = np.mean(actual_scores) # Compute mean for each permutation perm_means = [np.mean(scores) for scores in shuffled_scores] perm_means = np.array(perm_means) # Two‑tailed p‑value: proportion of permutations with mean >= original_mean # (or <= for negative, but here accuracy is positive) p_value = np.mean(perm_means >= original_mean) # For two‑tailed, also consider the opposite tail (if original is far below) # Simpler: use absolute difference p_value_two_tailed = np.mean( np.abs(perm_means - np.mean(perm_means)) >= np.abs(original_mean - np.mean(perm_means)) ) is_significant = p_value_two_tailed < 0.05 return { "p_value": p_value_two_tailed, "original_mean": original_mean, "mean_shuffled": np.mean(perm_means), "std_shuffled": np.std(perm_means), "is_significant": is_significant, }
[docs] def generate_permuted_scores( model_func, X: np.ndarray, y: np.ndarray, n_permutations: int = 100, cv_folds: int = 5 ) -> List[List[float]]: """ Generate permuted scores by shuffling the target variable y. Parameters ---------- model_func : callable Function that takes (X_train, y_train, X_test) and returns predictions. X : np.ndarray Feature matrix. y : np.ndarray Target values (to be shuffled). n_permutations : int Number of permutations. cv_folds : int Number of cross‑validation folds. Returns ------- list of list For each permutation, a list of fold scores. """ from sklearn.model_selection import KFold kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42) permuted_scores = [] for perm in range(n_permutations): # Shuffle target y_shuffled = np.random.permutation(y) fold_scores = [] for train_idx, test_idx in kf.split(X): X_train, X_test = X[train_idx], X[test_idx] y_train, y_test = y_shuffled[train_idx], y_shuffled[test_idx] # Train model and predict preds = model_func(X_train, y_train, X_test) # Compute accuracy (e.g., mean absolute error or correlation) from sklearn.metrics import mean_absolute_error score = -mean_absolute_error(y_test, preds) # negative MAE, higher is better fold_scores.append(score) permuted_scores.append(fold_scores) return permuted_scores
# ============================================================================ # Underfitting Detection (Priority 7.2) # ============================================================================
[docs] def underfitting_detection( train_errors: List[float], val_errors: List[float], baseline_error: float, threshold_ratio: float = 0.8, ) -> Dict: """ Detect underfitting by comparing model performance to baseline. Parameters ---------- train_errors : list Training errors (e.g., MAE) for each fold or epoch. val_errors : list Validation errors (same length). baseline_error : float Error of a trivial baseline (e.g., predicting mean). threshold_ratio : float If model's validation error > baseline_error * threshold_ratio, consider it underfitting (baseline is better or comparable). Returns ------- dict Contains: - 'is_underfitting': bool - 'reason': str - 'model_val_error': float - 'baseline_error': float - 'improvement_ratio': float (baseline / model error, >1 means model better) """ mean_val_error = np.mean(val_errors) improvement_ratio = baseline_error / (mean_val_error + 1e-12) # If model error is larger than baseline * threshold, model is worse or equal if mean_val_error >= baseline_error * threshold_ratio: is_under = True reason = f"Model validation error ({mean_val_error:.4f}) not better than baseline ({baseline_error:.4f})" else: is_under = False reason = f"Model outperforms baseline (improvement ratio: {improvement_ratio:.2f})" # Additional check: if both training and validation errors are high and similar if ( not is_under and np.mean(train_errors) > baseline_error * 0.5 and np.mean(val_errors) > baseline_error * 0.5 ): # Could be underfitting if both are high, but model is still better than baseline reason += " (both train and val errors high – possible underfitting)" return { "is_underfitting": is_under, "reason": reason, "model_val_error": mean_val_error, "baseline_error": baseline_error, "improvement_ratio": improvement_ratio, }
[docs] def learning_curve_analysis( train_scores: List[float], val_scores: List[float], train_sizes: List[int], baseline_error: float = None, ) -> Dict: """ Analyze learning curves for underfitting signs using relative metrics. Parameters ---------- train_scores : list Training scores (e.g., MAE) at different training set sizes. val_scores : list Validation scores. train_sizes : list Number of training samples used. baseline_error : float, optional Error of trivial baseline (e.g., predict mean). If provided, used to judge underfitting. Returns ------- dict Contains: - 'converged': bool (if validation score plateaued) - 'gap': float (final train - val gap) - 'underfitting_suspected': bool (if model is not clearly better than baseline) - 'final_val_score': float - 'improvement_over_baseline': float (if baseline provided) """ if len(val_scores) < 2: return { "converged": False, "gap": 0.0, "underfitting_suspected": False, "final_val_score": val_scores[-1] if val_scores else 0.0, } # Check convergence last_slope = (val_scores[-1] - val_scores[-2]) / (train_sizes[-1] - train_sizes[-2]) converged = abs(last_slope) < 0.01 final_train = train_scores[-1] final_val = val_scores[-1] gap = final_train - final_val # Underfitting suspected only if model does not meaningfully outperform baseline underfitting_suspected = False improvement_over_baseline = None if baseline_error is not None: improvement_over_baseline = baseline_error / (final_val + 1e-12) if improvement_over_baseline < 1.2: # less than 20% improvement underfitting_suspected = True else: # Without baseline, use heuristic: both scores are high and gap small # But since MAE scale varies, we rely on baseline. Warn user. pass return { "converged": converged, "gap": gap, "underfitting_suspected": underfitting_suspected, "final_train_score": final_train, "final_val_score": final_val, "improvement_over_baseline": improvement_over_baseline, }