Source code for mercurial.simulation.integrator

"""Numerical integration with fixed time steps and overflow protection."""

from dataclasses import dataclass
from typing import Callable, Dict, Tuple

import numpy as np


[docs] @dataclass class IntegrationConfig: method: str = "RK45" rtol: float = 1e-6 atol: float = 1e-9 max_step: float = 0.1 noise_amplitude: float = 0.01
[docs] class StochasticIntegrator: def __init__(self, config: IntegrationConfig): self.config = config self.noise_gen = np.random.default_rng() def _add_noise(self, state: np.ndarray, dt: float, noise_strength: np.ndarray) -> np.ndarray: noise = self.noise_gen.normal(0, np.sqrt(dt), size=len(state)) return state + noise_strength * noise
[docs] def integrate( self, dynamics: Callable[[float, np.ndarray], np.ndarray], initial_state: np.ndarray, t_span: Tuple[float, float], noise_strength: np.ndarray, ) -> Dict[str, np.ndarray]: from scipy.integrate import solve_ivp def det_dyn(t, y): return dynamics(t, y) solution = solve_ivp( det_dyn, t_span, initial_state, method=self.config.method, rtol=self.config.rtol, atol=self.config.atol, max_step=self.config.max_step, dense_output=False, ) t_eval = solution.t y_stoch = solution.y.copy() for i in range(1, len(t_eval)): dt = t_eval[i] - t_eval[i - 1] y_stoch[:, i] = self._add_noise(y_stoch[:, i - 1], dt, noise_strength) # Clip to prevent overflow y_stoch[:, i] = np.clip(y_stoch[:, i], -1e10, 1e10) return {"t": t_eval, "y": y_stoch}
[docs] def integrate_fixed_step( self, dynamics: Callable[[float, np.ndarray], np.ndarray], initial_state: np.ndarray, t_span: Tuple[float, float], dt: float, noise_strength: np.ndarray, ) -> Dict[str, np.ndarray]: """ Fixed-step Euler–Maruyama with overflow protection. """ t_start, t_end = t_span # If duration is extremely long, reduce dt proportionally to keep steps manageable max_steps = 100000 total_duration = t_end - t_start if total_duration / dt > max_steps: dt = total_duration / max_steps print(f"Warning: dt adjusted to {dt:.2f} to keep steps ≤ {max_steps}") n_steps = int((t_end - t_start) / dt) + 1 t = np.linspace(t_start, t_end, n_steps) y = np.zeros((len(initial_state), n_steps)) y[:, 0] = initial_state # Clamp initial state y[:, 0] = np.clip(y[:, 0], -1e10, 1e10) for i in range(1, n_steps): dt_actual = t[i] - t[i - 1] # Compute dynamics, catch overflow try: dy_det = dynamics(t[i - 1], y[:, i - 1]) # Clip derivative to prevent blow‑up dy_det = np.clip(dy_det, -1e6, 1e6) dy = dy_det * dt_actual except (OverflowError, FloatingPointError): # If overflow, set derivative to zero and continue dy = np.zeros_like(y[:, i - 1]) # Add noise noise = self.noise_gen.normal(0, np.sqrt(dt_actual), size=len(initial_state)) dy += noise_strength * noise y[:, i] = y[:, i - 1] + dy # Clip final state y[:, i] = np.clip(y[:, i], -1e10, 1e10) # If any NaN appears, break and return what we have if np.any(np.isnan(y[:, i])): print(f"Warning: NaN detected at step {i}, stopping integration.") return {"t": t[: i + 1], "y": y[:, : i + 1]} return {"t": t, "y": y}