"""Validation utilities: cross‑validation, regularization, early stopping."""
from copy import deepcopy
from typing import Any, Callable, Dict, List, Tuple
import numpy as np
from sklearn.model_selection import KFold
[docs]
class CrossValidator:
"""
k‑fold cross‑validation for Atlas case simulations.
Splits cases into training and validation folds, runs simulations,
and returns performance metrics per fold.
"""
def __init__(self, n_splits: int = 5, shuffle: bool = True, random_seed: int = 42):
self.n_splits = n_splits
self.shuffle = shuffle
self.random_seed = random_seed
self.kf = KFold(n_splits=n_splits, shuffle=shuffle, random_state=random_seed)
[docs]
def validate(
self,
case_functions: Dict[str, Callable],
parameter_sets: List[Dict[str, Any]],
metric: Callable[[Dict], float] = lambda x: 1.0,
) -> Dict[str, Any]:
"""
Perform cross‑validation.
Parameters
----------
case_functions : dict
Mapping case_name -> function that runs simulation and returns results dict.
parameter_sets : list of dict
Different parameter configurations to evaluate (e.g., different λ values).
metric : callable
Function to compute accuracy/score from simulation results.
Returns
-------
results : dict
Contains fold metrics, mean scores, std devs per parameter set.
"""
case_names = list(case_functions.keys())
n_cases = len(case_names)
if n_cases < self.n_splits:
raise ValueError(f"Only {n_cases} cases, cannot do {self.n_splits}-fold CV")
results = []
for param_idx, params in enumerate(parameter_sets):
fold_scores = []
for train_idx, val_idx in self.kf.split(case_names):
[case_names[i] for i in train_idx]
val_cases = [case_names[i] for i in val_idx]
# Train (calibrate) on training set – here we just run simulation with given params
# In real calibration, you would optimize params on training set.
# For demonstration, we evaluate on validation set directly.
scores = []
for case in val_cases:
try:
sim_results = case_functions[case]()
score = metric(sim_results)
scores.append(score)
except Exception as e:
print(f"Error on {case}: {e}")
scores.append(0.0)
fold_scores.append(np.mean(scores))
results.append(
{
"params": params,
"fold_scores": fold_scores,
"mean_score": np.mean(fold_scores),
"std_score": np.std(fold_scores),
}
)
return {"parameter_results": results, "n_folds": self.n_splits, "n_cases": n_cases}
[docs]
class Regularization:
"""
L2 regularization (ridge) for free energy or loss function.
Adds λ * ||θ||₂² to the objective.
"""
def __init__(self, lambda_reg: float = 0.01):
self.lambda_reg = lambda_reg
[docs]
def penalty(self, parameters: np.ndarray) -> float:
"""Compute L2 penalty: λ * Σ θ_i²."""
return self.lambda_reg * np.sum(parameters**2)
[docs]
def gradient(self, parameters: np.ndarray) -> np.ndarray:
"""Gradient of penalty: 2λ * θ."""
return 2 * self.lambda_reg * parameters
[docs]
class EarlyStopping:
"""
Early stopping to prevent overfitting during iterative calibration.
Stops when validation loss stops improving.
"""
def __init__(self, patience: int = 5, min_delta: float = 1e-4):
self.patience = patience
self.min_delta = min_delta
self.best_loss = np.inf
self.counter = 0
self.best_params = None
[docs]
def step(self, current_loss: float, current_params: Any) -> bool:
"""
Update state. Returns True if training should continue, False if stop.
"""
if current_loss < self.best_loss - self.min_delta:
self.best_loss = current_loss
self.best_params = deepcopy(current_params)
self.counter = 0
return True
else:
self.counter += 1
return self.counter < self.patience
[docs]
def reset(self):
self.best_loss = np.inf
self.counter = 0
self.best_params = None
[docs]
def bootstrap_confidence_intervals(
scores: List[float], n_resamples: int = 1000, ci: float = 0.95
) -> Tuple[float, float, float]:
"""
Compute bootstrap CI for mean of scores.
Returns (mean, lower, upper).
"""
rng = np.random.default_rng()
means = []
n = len(scores)
for _ in range(n_resamples):
resample = rng.choice(scores, size=n, replace=True)
means.append(np.mean(resample))
mean = np.mean(means)
lower = np.percentile(means, (1 - ci) / 2 * 100)
upper = np.percentile(means, (1 + ci) / 2 * 100)
return mean, lower, upper
# ============================================================================
# Permutation Tests (Priority 7.7)
# ============================================================================
[docs]
def permutation_test(
actual_scores: List[float], shuffled_scores: List[List[float]], n_permutations: int = 1000
) -> Dict[str, float]:
"""
Perform a permutation test to assess statistical significance.
Parameters
----------
actual_scores : list of float
Model accuracy scores on the original data (e.g., cross‑validation folds).
shuffled_scores : list of list of float
For each permutation, a list of scores (same length as actual_scores)
obtained by shuffling the relationship between inputs and outputs.
n_permutations : int
Number of permutations performed.
Returns
-------
dict
Contains 'p_value' (two‑tailed), 'mean_shuffled', 'std_shuffled',
'original_mean', and 'is_significant' (True if p < 0.05).
"""
original_mean = np.mean(actual_scores)
# Compute mean for each permutation
perm_means = [np.mean(scores) for scores in shuffled_scores]
perm_means = np.array(perm_means)
# Two‑tailed p‑value: proportion of permutations with mean >= original_mean
# (or <= for negative, but here accuracy is positive)
p_value = np.mean(perm_means >= original_mean)
# For two‑tailed, also consider the opposite tail (if original is far below)
# Simpler: use absolute difference
p_value_two_tailed = np.mean(
np.abs(perm_means - np.mean(perm_means)) >= np.abs(original_mean - np.mean(perm_means))
)
is_significant = p_value_two_tailed < 0.05
return {
"p_value": p_value_two_tailed,
"original_mean": original_mean,
"mean_shuffled": np.mean(perm_means),
"std_shuffled": np.std(perm_means),
"is_significant": is_significant,
}
[docs]
def generate_permuted_scores(
model_func, X: np.ndarray, y: np.ndarray, n_permutations: int = 100, cv_folds: int = 5
) -> List[List[float]]:
"""
Generate permuted scores by shuffling the target variable y.
Parameters
----------
model_func : callable
Function that takes (X_train, y_train, X_test) and returns predictions.
X : np.ndarray
Feature matrix.
y : np.ndarray
Target values (to be shuffled).
n_permutations : int
Number of permutations.
cv_folds : int
Number of cross‑validation folds.
Returns
-------
list of list
For each permutation, a list of fold scores.
"""
from sklearn.model_selection import KFold
kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
permuted_scores = []
for perm in range(n_permutations):
# Shuffle target
y_shuffled = np.random.permutation(y)
fold_scores = []
for train_idx, test_idx in kf.split(X):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y_shuffled[train_idx], y_shuffled[test_idx]
# Train model and predict
preds = model_func(X_train, y_train, X_test)
# Compute accuracy (e.g., mean absolute error or correlation)
from sklearn.metrics import mean_absolute_error
score = -mean_absolute_error(y_test, preds) # negative MAE, higher is better
fold_scores.append(score)
permuted_scores.append(fold_scores)
return permuted_scores
# ============================================================================
# Underfitting Detection (Priority 7.2)
# ============================================================================
[docs]
def underfitting_detection(
train_errors: List[float],
val_errors: List[float],
baseline_error: float,
threshold_ratio: float = 0.8,
) -> Dict:
"""
Detect underfitting by comparing model performance to baseline.
Parameters
----------
train_errors : list
Training errors (e.g., MAE) for each fold or epoch.
val_errors : list
Validation errors (same length).
baseline_error : float
Error of a trivial baseline (e.g., predicting mean).
threshold_ratio : float
If model's validation error > baseline_error * threshold_ratio,
consider it underfitting (baseline is better or comparable).
Returns
-------
dict
Contains:
- 'is_underfitting': bool
- 'reason': str
- 'model_val_error': float
- 'baseline_error': float
- 'improvement_ratio': float (baseline / model error, >1 means model better)
"""
mean_val_error = np.mean(val_errors)
improvement_ratio = baseline_error / (mean_val_error + 1e-12)
# If model error is larger than baseline * threshold, model is worse or equal
if mean_val_error >= baseline_error * threshold_ratio:
is_under = True
reason = f"Model validation error ({mean_val_error:.4f}) not better than baseline ({baseline_error:.4f})"
else:
is_under = False
reason = f"Model outperforms baseline (improvement ratio: {improvement_ratio:.2f})"
# Additional check: if both training and validation errors are high and similar
if (
not is_under
and np.mean(train_errors) > baseline_error * 0.5
and np.mean(val_errors) > baseline_error * 0.5
):
# Could be underfitting if both are high, but model is still better than baseline
reason += " (both train and val errors high – possible underfitting)"
return {
"is_underfitting": is_under,
"reason": reason,
"model_val_error": mean_val_error,
"baseline_error": baseline_error,
"improvement_ratio": improvement_ratio,
}
[docs]
def learning_curve_analysis(
train_scores: List[float],
val_scores: List[float],
train_sizes: List[int],
baseline_error: float = None,
) -> Dict:
"""
Analyze learning curves for underfitting signs using relative metrics.
Parameters
----------
train_scores : list
Training scores (e.g., MAE) at different training set sizes.
val_scores : list
Validation scores.
train_sizes : list
Number of training samples used.
baseline_error : float, optional
Error of trivial baseline (e.g., predict mean). If provided, used to judge underfitting.
Returns
-------
dict
Contains:
- 'converged': bool (if validation score plateaued)
- 'gap': float (final train - val gap)
- 'underfitting_suspected': bool (if model is not clearly better than baseline)
- 'final_val_score': float
- 'improvement_over_baseline': float (if baseline provided)
"""
if len(val_scores) < 2:
return {
"converged": False,
"gap": 0.0,
"underfitting_suspected": False,
"final_val_score": val_scores[-1] if val_scores else 0.0,
}
# Check convergence
last_slope = (val_scores[-1] - val_scores[-2]) / (train_sizes[-1] - train_sizes[-2])
converged = abs(last_slope) < 0.01
final_train = train_scores[-1]
final_val = val_scores[-1]
gap = final_train - final_val
# Underfitting suspected only if model does not meaningfully outperform baseline
underfitting_suspected = False
improvement_over_baseline = None
if baseline_error is not None:
improvement_over_baseline = baseline_error / (final_val + 1e-12)
if improvement_over_baseline < 1.2: # less than 20% improvement
underfitting_suspected = True
else:
# Without baseline, use heuristic: both scores are high and gap small
# But since MAE scale varies, we rely on baseline. Warn user.
pass
return {
"converged": converged,
"gap": gap,
"underfitting_suspected": underfitting_suspected,
"final_train_score": final_train,
"final_val_score": final_val,
"improvement_over_baseline": improvement_over_baseline,
}