"""Numerical integration with fixed time steps and overflow protection."""
from dataclasses import dataclass
from typing import Callable, Dict, Tuple
import numpy as np
[docs]
@dataclass
class IntegrationConfig:
method: str = "RK45"
rtol: float = 1e-6
atol: float = 1e-9
max_step: float = 0.1
noise_amplitude: float = 0.01
[docs]
class StochasticIntegrator:
def __init__(self, config: IntegrationConfig):
self.config = config
self.noise_gen = np.random.default_rng()
def _add_noise(self, state: np.ndarray, dt: float, noise_strength: np.ndarray) -> np.ndarray:
noise = self.noise_gen.normal(0, np.sqrt(dt), size=len(state))
return state + noise_strength * noise
[docs]
def integrate(
self,
dynamics: Callable[[float, np.ndarray], np.ndarray],
initial_state: np.ndarray,
t_span: Tuple[float, float],
noise_strength: np.ndarray,
) -> Dict[str, np.ndarray]:
from scipy.integrate import solve_ivp
def det_dyn(t, y):
return dynamics(t, y)
solution = solve_ivp(
det_dyn,
t_span,
initial_state,
method=self.config.method,
rtol=self.config.rtol,
atol=self.config.atol,
max_step=self.config.max_step,
dense_output=False,
)
t_eval = solution.t
y_stoch = solution.y.copy()
for i in range(1, len(t_eval)):
dt = t_eval[i] - t_eval[i - 1]
y_stoch[:, i] = self._add_noise(y_stoch[:, i - 1], dt, noise_strength)
# Clip to prevent overflow
y_stoch[:, i] = np.clip(y_stoch[:, i], -1e10, 1e10)
return {"t": t_eval, "y": y_stoch}
[docs]
def integrate_fixed_step(
self,
dynamics: Callable[[float, np.ndarray], np.ndarray],
initial_state: np.ndarray,
t_span: Tuple[float, float],
dt: float,
noise_strength: np.ndarray,
) -> Dict[str, np.ndarray]:
"""
Fixed-step Euler–Maruyama with overflow protection.
"""
t_start, t_end = t_span
# If duration is extremely long, reduce dt proportionally to keep steps manageable
max_steps = 100000
total_duration = t_end - t_start
if total_duration / dt > max_steps:
dt = total_duration / max_steps
print(f"Warning: dt adjusted to {dt:.2f} to keep steps ≤ {max_steps}")
n_steps = int((t_end - t_start) / dt) + 1
t = np.linspace(t_start, t_end, n_steps)
y = np.zeros((len(initial_state), n_steps))
y[:, 0] = initial_state
# Clamp initial state
y[:, 0] = np.clip(y[:, 0], -1e10, 1e10)
for i in range(1, n_steps):
dt_actual = t[i] - t[i - 1]
# Compute dynamics, catch overflow
try:
dy_det = dynamics(t[i - 1], y[:, i - 1])
# Clip derivative to prevent blow‑up
dy_det = np.clip(dy_det, -1e6, 1e6)
dy = dy_det * dt_actual
except (OverflowError, FloatingPointError):
# If overflow, set derivative to zero and continue
dy = np.zeros_like(y[:, i - 1])
# Add noise
noise = self.noise_gen.normal(0, np.sqrt(dt_actual), size=len(initial_state))
dy += noise_strength * noise
y[:, i] = y[:, i - 1] + dy
# Clip final state
y[:, i] = np.clip(y[:, i], -1e10, 1e10)
# If any NaN appears, break and return what we have
if np.any(np.isnan(y[:, i])):
print(f"Warning: NaN detected at step {i}, stopping integration.")
return {"t": t[: i + 1], "y": y[:, : i + 1]}
return {"t": t, "y": y}