"""Main simulation engine combining all modules."""
from typing import Callable, Dict, List, Optional, Tuple
import numpy as np
from mercurial.branches.alignment import BranchAlignment
from mercurial.branches.branch import RealityBranch
from mercurial.consciousness.biasing import TopDownBiasing
from mercurial.consciousness.perception import VariationalPerception
from mercurial.core.patterns import NeuralPattern
from mercurial.core.state_space import HilbertSpace
from mercurial.core.thermodynamics import EffectiveTemperature, FullFreeEnergy
from mercurial.core.wilson_cowan import WilsonCowanPopulation
from mercurial.impressions.persistence import ImpressionDynamics
from mercurial.params.empirical import (
DecoherenceParams,
EntanglementParams,
FDTDParams,
HairCellParams,
HebbianParams,
HopfParams,
ImpressionParams,
JansenRitParams,
KuramotoParams,
LadderCouplingParams,
NeuralFieldParams,
PatternCompletionParams,
PatternFormationParams,
PhotoreceptorParams,
QFTParams,
ThermodynamicParams,
WilsonCowanParams,
)
from mercurial.simulation.integrator import IntegrationConfig, StochasticIntegrator
from mercurial.spectral.modalities import Modality, ModalityEnergyHierarchy
from mercurial.spectral.perception import BayesianPerception
from mercurial.spectral.sequencing import TemporalSequencing
[docs]
class SimulationEngine:
"""Orchestrates multi‑branch, multi‑pattern simulations."""
def __init__(self, dim: int = 50):
from mercurial.params.empirical import (
HopfParams,
JansenRitParams,
KuramotoParams,
WilsonCowanParams,
)
self.space = HilbertSpace(dim)
self.branches: List[RealityBranch] = []
self.alignment: Optional[BranchAlignment] = None
self.integrator = StochasticIntegrator(IntegrationConfig())
self.impression_dynamics = ImpressionDynamics()
self.spectral_hierarchy = ModalityEnergyHierarchy()
self.spectral_sequencing = TemporalSequencing(self.spectral_hierarchy)
self.perception = BayesianPerception(self.spectral_hierarchy)
self.biasing = TopDownBiasing(kappa_bias=0.1)
self.perception = VariationalPerception(prior_strength=1.0, learning_rate=0.1)
self.temperature = EffectiveTemperature(initial_temp=1.0, env_temp=1.0)
self.free_energy_func = FullFreeEnergy(temperature=self.temperature)
self.wc_params = WilsonCowanParams()
self.jr_params = JansenRitParams()
self.hopf_params = HopfParams()
self.kuramoto_params = KuramotoParams()
self.apply_empirical_parameters()
[docs]
def set_alignment_from_levels(self, base_similarities: Optional[np.ndarray] = None):
"""
Initialize alignment using branch levels.
Parameters
----------
base_similarities : np.ndarray, optional
Base isomorphism matrix (N_branches x N_branches) before level adjustment.
If None, assumes identity (perfect isomorphism).
"""
levels = [b.level for b in self.branches]
self.alignment = BranchAlignment(
branch_levels=levels,
base_coupling=0.1,
alignment_threshold=0.7,
decay_length=1.0,
k_align=1.0,
S_crit=10.0,
)
if base_similarities is not None:
self.alignment.base_similarities = base_similarities
else:
n = len(self.branches)
self.alignment.base_similarities = np.eye(n)
[docs]
def add_branch(self, label: str = "", level: int = 7) -> int:
"""Add a new reality branch with specified LADDER level."""
bid = len(self.branches)
branch = RealityBranch(bid, label, level=level)
# Initialize with a random pattern
pattern = self._create_random_pattern(dim=self.space.dimension)
branch.set_global_pattern(pattern)
# Also initialize a state vector for decoherence calculations
from mercurial.core.state_space import StateVector
components = np.random.normal(0, 1, size=self.space.dimension) + 1j * np.random.normal(
0, 1, size=self.space.dimension
)
components = components / np.linalg.norm(components)
branch.set_state_vector(StateVector(self.space, components, t=0.0))
self.branches.append(branch)
return bid
[docs]
def set_alignment_matrix(self, similarity_matrix: np.ndarray):
self.alignment = BranchAlignment(similarity_matrix)
[docs]
def run(self, t_span: Tuple[float, float], dt: float = 0.1, lambda_reg: float = 0.0) -> Dict:
"""Run simulation with pattern evolution equation."""
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
np.linspace(t_span[0], t_span[1], n_steps)
# Initialize state vector (concatenate all branch patterns)
state_size = len(self.branches) * self.space.dimension
state = np.zeros(state_size)
for i, branch in enumerate(self.branches):
if branch.global_pattern is not None:
flat = branch.global_pattern.V.flatten()[: self.space.dimension]
state[i * self.space.dimension : (i + 1) * self.space.dimension] = flat
# Define dynamics using free energy gradient
def dynamics(t, y):
dy = np.zeros_like(y)
for i, branch in enumerate(self.branches):
idx = slice(i * self.space.dimension, (i + 1) * self.space.dimension)
pattern = branch.global_pattern
if isinstance(pattern, NeuralPattern):
pass
else:
v = y[idx].reshape(branch.global_pattern.V.shape)
branch.global_pattern.V = v
# Use full free energy gradient
from mercurial.core.dynamics import free_energy_gradient
grad, F = free_energy_gradient(branch.global_pattern, self.free_energy_func)
dy[idx] = -grad.flatten()
# 4. Bottom‑up perception: reality → consciousness
if hasattr(branch, "_is_conscious") and branch._is_conscious:
# For each conscious branch, find the nearest reality branch
# (simplification: use the same branch's reality)
reality_pattern = branch.global_pattern # placeholder
new_conscious = self.perception.update_consciousness_from_reality(
branch.global_pattern, reality_pattern
)
# Apply change to dynamics (as a velocity)
dy[idx] += (new_conscious.V - branch.global_pattern.V).flatten() / dt
# 3. Cross‑branch alignment (existing)
if self.alignment:
for j, other in enumerate(self.branches):
if i != j:
prob = self.alignment.alignment_probability(
i, j, branch.compute_entropy(), other.compute_entropy()
)
if prob > 0:
other_flat = y[
j * self.space.dimension : (j + 1) * self.space.dimension
]
transfer = self.alignment.compute_transfer(other_flat, prob * 0.1)
dy[idx] += transfer[: len(dy[idx])]
return dy
noise_strength = 0.01 * np.ones(state_size)
result = self.integrator.integrate_fixed_step(dynamics, state, t_span, dt, noise_strength)
return result
[docs]
def compute_manifestations(self, branch_id: int, time: float) -> Dict[Modality, float]:
"""Return manifestation probabilities for a branch at given time."""
branch = self.branches[branch_id]
pattern_energy = branch.global_pattern.free_energy()
pattern_entropy = branch.compute_entropy()
probs = {}
for mod in Modality:
probs[mod] = self.spectral_hierarchy.manifestation_probability(
mod, pattern_energy, pattern_entropy
)
return probs
def _create_random_pattern(self, dim: int):
import numpy as np
from mercurial.core.patterns import Constraints, Pattern, StabilityRegime
V = np.random.normal(0, 1, size=(50, dim))
return Pattern(V, Constraints(), StabilityRegime(), "random")
[docs]
def add_neural_branch(self, label: str = "", level: int = 9) -> int:
"""Add a branch with Wilson‑Cowan neural dynamics."""
wc = WilsonCowanPopulation()
pattern = NeuralPattern(wc, label)
bid = len(self.branches)
branch = RealityBranch(bid, label, level=level)
branch.set_global_pattern(pattern)
# Initialize state vector (placeholder)
from mercurial.core.state_space import StateVector
components = np.random.normal(0, 1, size=self.space.dimension) + 1j * np.random.normal(
0, 1, size=self.space.dimension
)
components = components / np.linalg.norm(components)
branch.set_state_vector(StateVector(self.space, components, t=0.0))
self.branches.append(branch)
return bid
# ========================================================================
# Wilson‑Cowan Integration (corrected)
# ========================================================================
[docs]
def run_neural(
self, t_span: Tuple[float, float], dt: float = 0.001, lambda_reg: float = 0.0
) -> Dict:
"""
Run simulation with Wilson‑Cowan dynamics for neural branches.
Enforces non‑negative activity by clamping after each step.
"""
from scipy.integrate import solve_ivp
from mercurial.core.patterns import NeuralPattern
neural_branches = [b for b in self.branches if isinstance(b.global_pattern, NeuralPattern)]
if not neural_branches:
return self.run(t_span, dt, lambda_reg)
# Initial state
state_size = 2 * len(neural_branches)
y0 = np.zeros(state_size)
for idx, branch in enumerate(neural_branches):
pattern = branch.global_pattern
if pattern.E_history is not None and len(pattern.E_history) > 0:
y0[2 * idx] = max(0.0, pattern.E_history[-1])
y0[2 * idx + 1] = max(0.0, pattern.I_history[-1])
else:
y0[2 * idx] = 0.1
y0[2 * idx + 1] = 0.1
def neural_ode(t, y):
dy = np.zeros_like(y)
for idx, branch in enumerate(neural_branches):
E = y[2 * idx]
I = y[2 * idx + 1]
pattern = branch.global_pattern
wc = pattern.wc
P_ext, Q_ext = pattern.get_external_inputs()
# Use clamped derivative
deriv = wc.clamped_derivative(t, np.array([E, I]), P_ext, Q_ext)
dy[2 * idx] = deriv[0]
dy[2 * idx + 1] = deriv[1]
return dy
# Solve with small max_step to prevent large jumps
solution = solve_ivp(
neural_ode, t_span, y0, method="BDF", rtol=1e-4, atol=1e-6, max_step=dt
)
times = solution.t
y_sol = solution.y
# Clamp the entire solution to [0,1]
y_sol = np.clip(y_sol, 0.0, 1.0)
for idx, branch in enumerate(neural_branches):
pattern = branch.global_pattern
E_sol = y_sol[2 * idx, :]
I_sol = y_sol[2 * idx + 1, :]
pattern.E_history = E_sol
pattern.I_history = I_sol
pattern.V = np.column_stack([E_sol, I_sol])
pattern.current_time = times[-1]
return {"t": times, "y": y_sol, "branches": neural_branches}
# ========================================================================
# Neural branch creation (appended)
# ========================================================================
[docs]
def create_neural_branch(
self, label: str = "", level: int = 9, wc_params: Optional[WilsonCowanParams] = None
) -> int:
from mercurial.core.patterns import NeuralPattern
from mercurial.core.wilson_cowan import WilsonCowanPopulation
if wc_params is None:
wc_params = self.wc_params # stored after apply_empirical_parameters()
wc = WilsonCowanPopulation(params=wc_params)
pattern = NeuralPattern(wc, label)
bid = len(self.branches)
branch = RealityBranch(bid, label, level=level)
branch.set_global_pattern(pattern)
# Initialize state vector for decoherence
from mercurial.core.state_space import StateVector
components = np.random.normal(0, 1, size=self.space.dimension) + 1j * np.random.normal(
0, 1, size=self.space.dimension
)
components = components / np.linalg.norm(components)
branch.set_state_vector(StateVector(self.space, components, t=0.0))
self.branches.append(branch)
return bid
# ========================================================================
# Kuramoto‑coupled Wilson‑Cowan simulation (appended)
# ========================================================================
[docs]
def run_neural_with_kuramoto(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
coupling_strength: float = 0.5,
phase_noise: float = 0.05,
kuramoto_feedback: float = 0.1,
lambda_reg: float = 0.0,
) -> Dict:
"""
Run neural simulation with Kuramoto coupling between branches.
Parameters
----------
t_span : tuple
(t_start, t_end) in seconds.
dt : float
Maximum integration step (used by ODE solver).
coupling_strength : float
K in Kuramoto equation.
phase_noise : float
σ_φ.
kuramoto_feedback : float
κ_Kuramoto (scaling of coupling term added to Wilson‑Cowan P_ext).
lambda_reg : float
Regularisation (unused here, for compatibility).
"""
from scipy.integrate import solve_ivp
from mercurial.core.kuramoto import KuramotoNetwork
from mercurial.core.patterns import NeuralPattern
# Collect neural branches
neural_branches = [b for b in self.branches if isinstance(b.global_pattern, NeuralPattern)]
if len(neural_branches) < 2:
print(
"Warning: Need at least 2 neural branches for Kuramoto coupling. Falling back to run_neural."
)
return self.run_neural(t_span, dt, lambda_reg)
N = len(neural_branches)
# Create Kuramoto network with natural frequencies derived from Wilson‑Cowan parameters
# (for simplicity, use fixed frequencies around 40 Hz)
kuramoto_net = KuramotoNetwork(
N, coupling_strength=coupling_strength, noise_amplitude=phase_noise
)
# Initial state: [E1, I1, E2, I2, ...]
y0 = []
for branch in neural_branches:
pattern = branch.global_pattern
if pattern.E_history is not None and len(pattern.E_history) > 0:
y0.append(max(0.0, pattern.E_history[-1]))
y0.append(max(0.0, pattern.I_history[-1]))
else:
y0.append(0.1)
y0.append(0.1)
y0 = np.array(y0)
# Pre‑compute natural frequencies for Kuramoto (based on Wilson‑Cowan parameters)
# Here we extract the excitatory time constant as proxy for frequency
for idx, branch in enumerate(neural_branches):
wc = branch.global_pattern.wc
# Approx natural frequency = 1/(2πτ_e) (Hz) -> rad/s
omega_est = 2 * np.pi * (1.0 / (2 * np.pi * wc.tau_e)) # = 1/τ_e
kuramoto_net.oscillators[idx].omega = omega_est
# ODE function with Kuramoto coupling
def neural_ode(t, y):
dy = np.zeros_like(y)
# Update Kuramoto phases using current E activities (from y)
phases = []
for idx, branch in enumerate(neural_branches):
E = y[2 * idx]
# For phase extraction we need a short history – we approximate using current E only
# In a full implementation we'd store a buffer. Here we use a simple estimator:
# dφ/dt = ω + coupling term, but we need φ to compute coupling. We'll integrate phases separately.
# Instead, we let Kuramoto network evolve independently using its own internal state.
phases.append(kuramoto_net.oscillators[idx].phi)
# Update Kuramoto network for this dt (using current phases)
# We'll do a sub‑step: since dt may be larger than acceptable for phase integration,
# we sub‑step the Kuramoto equations with smaller step.
n_sub = max(1, int(dt / 0.0005))
sub_dt = dt / n_sub
for _ in range(n_sub):
kuramoto_net.update(sub_dt)
# Get mean field for coupling term
mf = kuramoto_net.mean_field()
# Now compute Wilson‑Cowan derivatives with Kuramoto feedback
for idx, branch in enumerate(neural_branches):
E = y[2 * idx]
I = y[2 * idx + 1]
pattern = branch.global_pattern
wc = pattern.wc
P_base, Q_base = pattern.get_external_inputs()
# Compute Kuramoto coupling term for this oscillator
phase = kuramoto_net.oscillators[idx].phi
coupling_term = (
kuramoto_feedback * coupling_strength * np.imag(mf * np.exp(-1j * phase))
)
P_ext = P_base + coupling_term
Q_ext = Q_base # usually no direct coupling to inhibition
deriv = wc.clamped_derivative(t, np.array([E, I]), P_ext, Q_ext)
dy[2 * idx] = deriv[0]
dy[2 * idx + 1] = deriv[1]
return dy
# Solve ODE
solution = solve_ivp(
neural_ode, t_span, y0, method="BDF", rtol=1e-4, atol=1e-6, max_step=dt
)
times = solution.t
y_sol = np.clip(solution.y, 0.0, 1.0)
# Update branch patterns and Kuramoto phases from final state
for idx, branch in enumerate(neural_branches):
pattern = branch.global_pattern
E_sol = y_sol[2 * idx, :]
I_sol = y_sol[2 * idx + 1, :]
pattern.E_history = E_sol
pattern.I_history = I_sol
pattern.V = np.column_stack([E_sol, I_sol])
pattern.current_time = times[-1]
# Also store Kuramoto order parameter history (optional)
for t in times:
# We don't have the full phase trajectory, but we can approximate
# by re‑evaluating phases from the Wilson‑Cowan activities
# For simplicity, we just compute final order parameter.
pass
final_R, final_psi = kuramoto_net.order_parameter()
return {
"t": times,
"y": y_sol,
"branches": neural_branches,
"kuramoto_R": final_R,
"kuramoto_psi": final_psi,
}
# ========================================================================
# 2D Neural Field Simulation (appended)
# ========================================================================
[docs]
def run_neural_field(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
nx: int = 64,
ny: int = 64,
dx: float = None,
wc_params: Optional[dict] = None,
external_stimulus: Optional[Callable[[float], np.ndarray]] = None,
save_history_every: int = 10,
) -> Dict:
"""
Run a 2D neural field simulation using empirical lateral connectivity parameters.
"""
from mercurial.core.neural_field import MexicanHatKernel, NeuralField2D
# Use empirical dx if not provided
if dx is None:
dx = self.neural_field_params.dx
# Create kernels with empirical parameters
kernel_ee = MexicanHatKernel(
size=15,
dx=dx,
A_e=self.neural_field_params.A_e,
sigma_e=self.neural_field_params.sigma_e,
A_i=self.neural_field_params.A_i,
sigma_i=self.neural_field_params.sigma_i,
)
kernel_ie = MexicanHatKernel(size=15, dx=dx, A_e=0.0, A_i=0.5, sigma_i=1.5)
field = NeuralField2D(nx, ny, dx, dx, wc_params or {}, kernel_ee, kernel_ie)
times = []
E_history = []
I_history = []
t = t_span[0]
step = 0
while t < t_span[1]:
if external_stimulus is not None:
P_ext = external_stimulus(t)
else:
P_ext = None
field.step(dt, P_ext, None)
t += dt
step += 1
if step % save_history_every == 0:
times.append(t)
E_history.append(field.E.copy())
I_history.append(field.I.copy())
return {
"t": np.array(times),
"E_history": E_history,
"I_history": I_history,
"final_E": field.E,
"final_I": field.I,
"order_parameter": field.order_parameter(),
}
# ========================================================================
# Kuramoto‑coupled 2D neural fields (appended)
# ========================================================================
[docs]
def run_coupled_fields(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
nx: int = 64,
ny: int = 64,
dx: float = None,
coupling_strength: float = None,
stimulus_A: Optional[Callable[[float], np.ndarray]] = None,
stimulus_B: Optional[Callable[[float], np.ndarray]] = None,
save_history_every: int = 10,
) -> Dict:
from mercurial.core.coupled_fields import CoupledNeuralFields
if dx is None:
dx = self.neural_field_params.dx
if coupling_strength is None:
coupling_strength = self.kuramoto_params.coupling_moderate
coupled = CoupledNeuralFields(
nx,
ny,
dx,
coupling_strength_AB=coupling_strength,
coupling_strength_BA=coupling_strength,
)
times = []
E_A_history, I_A_history = [], []
E_B_history, I_B_history = [], []
R_A_hist, R_B_hist = [], []
t = t_span[0]
step = 0
while t < t_span[1]:
if stimulus_A is not None:
P_A = stimulus_A(t)
else:
P_A = None
if stimulus_B is not None:
P_B = stimulus_B(t)
else:
P_B = None
coupled.step(dt, P_ext_A=P_A, P_ext_B=P_B)
t += dt
step += 1
if step % save_history_every == 0:
times.append(t)
E_A_history.append(coupled.field_A.E.copy())
I_A_history.append(coupled.field_A.I.copy())
E_B_history.append(coupled.field_B.E.copy())
I_B_history.append(coupled.field_B.I.copy())
R_A, _, R_B, _ = coupled.get_order_parameters()
R_A_hist.append(R_A)
R_B_hist.append(R_B)
return {
"t": np.array(times),
"E_A": E_A_history,
"I_A": I_A_history,
"E_B": E_B_history,
"I_B": I_B_history,
"R_A": np.array(R_A_hist),
"R_B": np.array(R_B_hist),
"final_R_A": R_A_hist[-1] if R_A_hist else 0,
"final_R_B": R_B_hist[-1] if R_B_hist else 0,
}
[docs]
def compute_entanglement(
self, field_results: Dict, mode1: Tuple[int, int], mode2: Tuple[int, int]
) -> float:
"""
Compute Pearson correlation coefficient between two spatial modes of a Klein‑Gordon field.
Returns value in [0, 1] (absolute correlation).
"""
# Extract field values over time at the two points
phi1 = [phi[mode1[0], mode1[1]] for phi in field_results["phi"]]
phi2 = [phi[mode2[0], mode2[1]] for phi in field_results["phi"]]
phi1 = np.array(phi1)
phi2 = np.array(phi2)
# Pearson correlation
corr_matrix = np.corrcoef(phi1, phi2)
rho = corr_matrix[0, 1]
return abs(rho)
# ========================================================================
# Hopf oscillator simulation (appended)
# ========================================================================
[docs]
def run_hopf(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
n_oscillators: int = 10,
kuramoto_coupling: float = 0.0,
diffusive_coupling: float = 0.0,
time_varying_alpha: Optional[Callable[[float, int], float]] = None,
) -> Dict:
"""
Run a simulation of a Hopf oscillator network.
Parameters
----------
t_span : tuple
(t_start, t_end) in seconds.
dt : float
Time step.
n_oscillators : int
Number of oscillators.
kuramoto_coupling : float
Phase coupling strength K.
diffusive_coupling : float
Amplitude coupling strength ε.
time_varying_alpha : callable, optional
Function alpha(t, oscillator_index) returning bifurcation parameter.
Returns
-------
dict
Contains times, amplitudes, phases, and order parameter.
"""
from mercurial.core.hopf import HopfNetwork
network = HopfNetwork(
n_oscillators,
kuramoto_coupling=kuramoto_coupling,
diffusive_coupling=diffusive_coupling,
)
times = []
amplitudes = []
phases = []
order_params = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
# Update bifurcation parameters if time‑varying
if time_varying_alpha is not None:
for i, osc in enumerate(network.oscillators):
osc.set_bifurcation(time_varying_alpha(t, i))
network.step(dt)
t += dt
if step % 10 == 0:
times.append(t)
amplitudes.append(network.get_mean_amplitude())
phases.append(network.get_order_parameter()[1])
order_params.append(network.get_order_parameter()[0])
return {
"t": np.array(times),
"amplitudes": np.array(amplitudes),
"phases": np.array(phases),
"order_parameter": np.array(order_params),
}
# ========================================================================
# Jansen‑Rit simulation (appended)
# ========================================================================
[docs]
def run_jansen_rit(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
n_columns: int = 1,
coupling_strength: float = 0.0,
external_input: Optional[Callable[[float], float]] = None,
) -> Dict:
"""
Run a Jansen‑Rit neural mass model simulation.
Parameters
----------
t_span : tuple
(t_start, t_end) in seconds.
dt : float
Time step.
n_columns : int
Number of coupled columns.
coupling_strength : float
Diffusive coupling strength.
external_input : callable, optional
Function of time returning external input p(t) (for one column, applied to all).
Returns
-------
dict
Contains times and EEG signals.
"""
from mercurial.core.jansen_rit import JansenRitNetwork
network = JansenRitNetwork(n_columns, coupling_strength=coupling_strength)
times = []
eeg_signals = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
if external_input is not None:
p = external_input(t)
# Apply same input to all columns
p_arr = np.full(n_columns, p)
else:
p_arr = None
network.step(dt, p_arr)
t += dt
if step % 10 == 0:
times.append(t)
eeg_signals.append(network.get_mean_EEG())
return {
"t": np.array(times),
"EEG": np.array(eeg_signals),
"final_EEG": network.get_mean_EEG(),
}
# ========================================================================
# FDTD electromagnetic simulation (appended)
# ========================================================================
[docs]
def run_fdtd(
self,
t_span: Tuple[float, float],
nx: int = 100,
ny: int = 100,
dx: float = None,
dy: float = None,
tissue_type: str = None,
source: Optional[Callable[[int, int, float], float]] = None,
save_every: int = 10,
) -> Dict:
"""
Run FDTD simulation using empirical biological tissue dielectric properties.
"""
from mercurial.core.fdtd import FDTD2D
if dx is None:
dx = 0.001 # default 1 mm, but empirical might be set in params
if dy is None:
dy = dx
if tissue_type is None:
tissue_type = self.fdtd_params.tissue_type
fdtd = FDTD2D(nx, ny, dx, dy, tissue_type=tissue_type, frequency=self.fdtd_params.frequency)
times = []
Ex_history = []
Ey_history = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / fdtd.dt) + 1
for step in range(n_steps):
fdtd.step(source, t)
t += fdtd.dt
if step % save_every == 0:
times.append(t)
Ex_history.append(fdtd.Ex.copy())
Ey_history.append(fdtd.Ey.copy())
return {
"t": np.array(times),
"Ex": Ex_history,
"Ey": Ey_history,
"final_Ex": fdtd.Ex,
"final_Ey": fdtd.Ey,
}
# ========================================================================
# Hebbian plasticity simulations (appended)
# ========================================================================
[docs]
def run_plastic_wc(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
P_ext: float = 100.0,
Q_ext: float = 0.0,
) -> Dict:
"""
Run Wilson‑Cowan with Hebbian plasticity.
"""
from mercurial.core.plasticity import PlasticWilsonCowan
wc = PlasticWilsonCowan()
times = []
E_hist = []
I_hist = []
w_ee_hist = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
wc.step(dt, P_ext, Q_ext)
t += dt
if step % 10 == 0:
times.append(t)
E_hist.append(wc.E)
I_hist.append(wc.I)
w_ee_hist.append(wc.w_ee)
return {
"t": np.array(times),
"E": np.array(E_hist),
"I": np.array(I_hist),
"w_ee": np.array(w_ee_hist),
}
[docs]
def run_plastic_jr(
self, t_span: Tuple[float, float], dt: float = 0.0005, p_ext: float = 100.0
) -> Dict:
"""
Run Jansen‑Rit with plastic connectivity.
"""
from mercurial.core.plasticity import PlasticJansenRit
jr = PlasticJansenRit()
times = []
eeg_hist = []
C1_hist = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
jr.step(dt, p_ext)
t += dt
if step % 10 == 0:
times.append(t)
eeg_hist.append(jr.get_EEG())
C1_hist.append(jr.C1)
return {"t": np.array(times), "EEG": np.array(eeg_hist), "C1": np.array(C1_hist)}
# ========================================================================
# Sensory transduction simulations (appended)
# ========================================================================
[docs]
def run_visual_transduction(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
intensity_func: Optional[Callable[[float], float]] = None,
) -> Dict:
"""
Simulate visual transduction for a time‑varying light intensity.
"""
from mercurial.core.sensory_transduction import VisualTransductionPipeline
pipeline = VisualTransductionPipeline()
times = []
photoreceptor = []
bipolar = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
I = intensity_func(t) if intensity_func else 100.0
B = pipeline.step(dt, I)
t += dt
if step % 10 == 0:
times.append(t)
photoreceptor.append(pipeline.photoreceptor.V)
bipolar.append(B)
return {
"t": np.array(times),
"photoreceptor": np.array(photoreceptor),
"bipolar": np.array(bipolar),
}
[docs]
def run_auditory_transduction(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
displacement_func: Optional[Callable[[float], float]] = None,
) -> Dict:
"""
Simulate auditory hair cell transduction.
"""
from mercurial.core.sensory_transduction import AuditoryTransductionPipeline
pipeline = AuditoryTransductionPipeline()
times = []
currents = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
x = displacement_func(t) if displacement_func else 0.0
I = pipeline.step(dt, x)
t += dt
if step % 10 == 0:
times.append(t)
currents.append(I)
return {"t": np.array(times), "current": np.array(currents)}
# ========================================================================
# Pattern completion simulation (appended)
# ========================================================================
[docs]
def run_pattern_completion(
self,
t_span: Tuple[float, float],
dt: float = 0.001,
n_neurons: int = None,
stored_patterns: Optional[List[np.ndarray]] = None,
input_function: Optional[Callable[[float], np.ndarray]] = None,
) -> Dict:
"""
Simulate Hopfield pattern completion network using empirical olfactory bulb parameters.
"""
from mercurial.core.pattern_completion import HopfieldPatternCompletion
if n_neurons is None:
n_neurons = self.pattern_completion_params.n_glomeruli
if stored_patterns is None:
# Create random orthogonal patterns
pats = []
for _ in range(3):
p = np.random.choice([-1, 1], size=n_neurons)
pats.append(p)
stored_patterns = pats
net = HopfieldPatternCompletion(
n_neurons=n_neurons,
stored_patterns=stored_patterns,
tau_a=self.pattern_completion_params.tau_a,
tau_f=1.0,
beta_f=0.5,
noise_amp=0.01,
beta_sigmoid=10.0,
hebb_params=self.hebbian_params,
)
times = []
r_hist = []
overlaps = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dt) + 1
for step in range(n_steps):
if input_function is not None:
inp = input_function(t)
else:
inp = np.zeros(n_neurons)
r = net.step(dt, inp)
t += dt
if step % 10 == 0:
times.append(t)
r_hist.append(r.copy())
ov = net.get_overlap(stored_patterns[0])
overlaps.append(ov)
return {
"t": np.array(times),
"r": np.array(r_hist),
"overlap": np.array(overlaps),
"stored_patterns": stored_patterns,
}
# ========================================================================
# QFT propagation simulations (appended)
# ========================================================================
[docs]
def run_klein_gordon(
self,
t_span: Tuple[float, float],
nx: int = 64,
ny: int = 64,
dx: float = 0.1,
m: float = 1.0,
initial_phi: Optional[np.ndarray] = None,
source_func: Optional[Callable[[float], np.ndarray]] = None,
save_every: int = 10,
) -> Dict:
"""
Run 2D Klein‑Gordon simulation.
"""
from mercurial.core.qft_propagation import KleinGordon2D
kg = KleinGordon2D(nx, ny, dx, m=m, initial_phi=initial_phi, absorbing_boundary=True)
times = []
phi_history = []
energy_history = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / kg.dt) + 1
for step in range(n_steps):
src = source_func(t) if source_func else None
kg.step(src)
t += kg.dt
if step % save_every == 0:
times.append(t)
phi_history.append(kg.phi.copy())
energy_history.append(kg.energy_density().copy())
return {
"t": np.array(times),
"phi": phi_history,
"energy": energy_history,
"dt": kg.dt,
"final_phi": kg.phi,
}
[docs]
def run_dirac(
self,
t_span: Tuple[float, float],
nx: int = 256,
dx: float = 0.05,
m: float = 1.0,
save_every: int = 10,
) -> Dict:
"""
Run 1+1D Dirac simulation.
"""
from mercurial.core.qft_propagation import Dirac1D
dirac = Dirac1D(nx, dx, m=m)
times = []
psi_history = []
rho_history = []
t = t_span[0]
n_steps = int((t_span[1] - t_span[0]) / dirac.dt) + 1
for step in range(n_steps):
dirac.step()
t += dirac.dt
if step % save_every == 0:
times.append(t)
psi_history.append(dirac.psi.copy())
rho_history.append(dirac.probability_density().copy())
return {"t": np.array(times), "psi": psi_history, "rho": rho_history, "dt": dirac.dt}
# ========================================================================
# Entanglement analysis (appended)
# ========================================================================
[docs]
def apply_empirical_parameters(self) -> None:
"""Apply literature‑sourced parameters to all modules."""
# Neural models
self.wc_params = WilsonCowanParams()
self.jr_params = JansenRitParams()
self.hopf_params = HopfParams()
self.kuramoto_params = KuramotoParams()
self.hebbian_params = HebbianParams()
# Sensory transduction
self.photo_params = PhotoreceptorParams()
self.hair_params = HairCellParams()
# QFT
self.qft_params = QFTParams()
# Realism enhancements
self.neural_field_params = NeuralFieldParams()
self.fdtd_params = FDTDParams()
self.pattern_completion_params = PatternCompletionParams()
self.entanglement_params = EntanglementParams()
# Priorities 1‑4
self.thermo_params = ThermodynamicParams()
self.pattern_formation_params = PatternFormationParams()
self.impression_params = ImpressionParams()
self.ladder_params = LadderCouplingParams()
self.decoherence_params = DecoherenceParams()
print("Empirical parameters applied:")
print(
f" Wilson‑Cowan: τₑ={self.wc_params.tau_e*1000:.2f}ms, τᵢ={self.wc_params.tau_i*1000:.2f}ms"
)
print(
f" Jansen‑Rit: τₑ={1/self.jr_params.a*1000:.1f}ms, τᵢ={1/self.jr_params.b*1000:.1f}ms"
)
print(f" Kuramoto: K={self.kuramoto_params.coupling_moderate}")
print(f" Hebbian: η={self.hebbian_params.learning_rate}, γ={self.hebbian_params.decay}")
print(
f" Neural field: σₑ={self.neural_field_params.sigma_e}mm, σᵢ={self.neural_field_params.sigma_i}mm"
)
print(
f" FDTD tissue: {self.fdtd_params.tissue_type} (εᵣ={self.fdtd_params.eps_r_gray if self.fdtd_params.tissue_type=='gray_matter' else self.fdtd_params.eps_r_white})"
)
print(f" Pattern completion: N_neurons={self.pattern_completion_params.n_glomeruli}")
print(f" Entanglement: squeezing r={self.entanglement_params.squeezing_r}")
print(f" Thermodynamics: T_eff={self.thermo_params.T_eff}, η={self.thermo_params.eta}")
print(
f" LADDER coupling: K0={self.ladder_params.K0}, decay length={self.ladder_params.l_decay}"
)
print(f" Decoherence: γ₀={self.decoherence_params.gamma_dec0} s⁻¹")