"""
pasted._optimizer
=================
Objective-based structure optimization.
Three methods
-------------
``"annealing"``
Simulated Annealing with exponential cooling from *T_start* to *T_end*.
``"basin_hopping"``
Basin-Hopping: each step applies a more thorough relaxation (3x relax
cycles) before the Metropolis acceptance test. Temperature is held
constant at *T_start*.
``"parallel_tempering"``
Parallel Tempering (replica exchange): *n_replicas* independent Markov
chains run at geometrically spaced temperatures between *T_start* and
*T_pt_high*. Every *swap_interval* steps, adjacent replicas attempt a
swap via the Metropolis exchange criterion. The lowest-temperature
replica benefits from high-temperature replicas crossing energy barriers.
All replicas final structures are returned in ``OptimizationResult``.
Move types (chosen with equal probability each step)
----------------------------------------------------
Fragment coordinate move
Compute per-atom Q6. Atoms whose local Q6 exceeds *frag_threshold*
are considered "accidentally ordered" and are displaced by a random
vector of magnitude <= *move_step* Ang. If no atom exceeds the threshold
(structure is already fully disordered), a single random atom is moved.
Composition move
Parity-preserving composition change: swap two atoms with different
elements (always parity-safe), or replace two atoms simultaneously
so the net electron-count change is even (parity-preserving fallback).
Objective function
------------------
The objective is **maximised**. Pass a weight dict or any callable::
# dict: f = sum(w * metric)
objective = {"H_atom": 1.0, "H_spatial": 1.0, "Q6": -2.0}
# callable
objective = lambda m: m["H_spatial"] - 2.0 * m["Q6"]
Use negative weights to penalise a metric.
"""
from __future__ import annotations
import math
import random
import sys
import warnings
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
import numpy as np
from ._atoms import (
ALL_METRICS,
ATOMIC_NUMBERS,
_cov_radius_ang,
default_element_pool,
parse_element_spec,
validate_charge_mult,
)
from ._ext import HAS_RELAX
from ._ext import relax_positions as _cpp_relax_positions
from ._generator import Structure, StructureGenerator
from ._io import _fmt
from ._metrics import compute_all_metrics, compute_steinhardt_per_atom
from ._placement import Vec3, place_gas, relax_positions
# ---------------------------------------------------------------------------
# Public type alias
# ---------------------------------------------------------------------------
ObjectiveType = dict[str, float] | Callable[[dict[str, float]], float]
# ---------------------------------------------------------------------------
# OptimizationResult
# ---------------------------------------------------------------------------
[docs]
@dataclass
class OptimizationResult:
"""Return value of :meth:`StructureOptimizer.run`.
Wraps all per-restart results and exposes the best structure as a
first-class attribute. Behaves like a ``list[Structure]`` — indexing,
iteration, ``len``, and ``bool`` all work — so callers that only want
the best result can access it without changing existing code::
result = opt.run()
best = result.best # highest-scoring Structure
best = result[0] # same — index 0 is always the best
for s in result: # iterate all restarts, best first
print(s.metrics["H_total"])
Attributes
----------
all_structures:
All structures produced by each restart, sorted by objective value
(highest first). ``all_structures[0]`` is always the best.
objective_scores:
Scalar objective values corresponding to each entry in
``all_structures``.
n_restarts_attempted:
Number of restarts that were actually run (may be less than
``n_restarts`` when initial-structure generation fails).
method:
The optimization method used (``"annealing"``,
``"basin_hopping"``, or ``"parallel_tempering"``).
Examples
--------
Single-structure usage (backward-compatible)::
result = opt.run()
result.best.to_xyz() # best structure
result[0].to_xyz() # same
All-restarts usage::
result = opt.run()
print(result.summary())
for rank, s in enumerate(result, 1):
print(f"rank {rank}: H_total={s.metrics['H_total']:.3f}")
"""
all_structures: list[Structure] = field(default_factory=list)
objective_scores: list[float] = field(default_factory=list)
n_restarts_attempted: int = 0
method: str = "annealing"
# ------------------------------------------------------------------ #
# list-compatible interface (best-first order) #
# ------------------------------------------------------------------ #
@property
def best(self) -> Structure:
"""The structure with the highest objective value."""
if not self.all_structures:
raise RuntimeError("OptimizationResult is empty — all restarts failed.")
return self.all_structures[0]
[docs]
def __len__(self) -> int:
return len(self.all_structures)
[docs]
def __iter__(self) -> Iterator[Structure]:
return iter(self.all_structures)
def __getitem__(self, index: int | slice) -> Structure | list[Structure]:
if isinstance(index, slice):
return self.all_structures[index]
return self.all_structures[index]
def __bool__(self) -> bool:
return bool(self.all_structures)
[docs]
def __repr__(self) -> str:
if not self.all_structures:
return f"OptimizationResult(empty, method={self.method!r})"
best_f = self.objective_scores[0] if self.objective_scores else float("nan")
return (
f"OptimizationResult("
f"restarts={self.n_restarts_attempted}, "
f"best_f={best_f:.4f}, "
f"method={self.method!r})"
)
# ------------------------------------------------------------------ #
# Metadata helpers #
# ------------------------------------------------------------------ #
[docs]
def summary(self) -> str:
"""Return a human-readable one-line summary of the optimization run.
Returns
-------
str
E.g. ``"restarts=5 best_f=1.2294 worst_f=0.7823 method='annealing'"``.
"""
if not self.objective_scores:
return f"restarts={self.n_restarts_attempted} no results method={self.method!r}"
return (
f"restarts={self.n_restarts_attempted}"
f" best_f={self.objective_scores[0]:.4f}"
f" worst_f={self.objective_scores[-1]:.4f}"
f" method={self.method!r}"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
[docs]
def parse_objective_spec(specs: list[str]) -> dict[str, float]:
"""Parse ``["METRIC:WEIGHT", ...]`` into a weight dict.
Parameters
----------
specs:
Each string must be of the form ``"METRIC:WEIGHT"``,
e.g. ``["H_atom:1.0", "Q6:-2.0"]``.
Returns
-------
dict[str, float]
Raises
------
ValueError
On malformed strings or unknown metric names.
"""
result: dict[str, float] = {}
for spec in specs:
parts = spec.split(":")
if len(parts) != 2:
raise ValueError(f"Expected 'METRIC:WEIGHT', got {spec!r}")
metric, weight_s = parts
if metric not in ALL_METRICS:
raise ValueError(
f"Unknown metric {metric!r}. Valid: {sorted(ALL_METRICS)}"
)
result[metric] = float(weight_s)
return result
def _eval_objective(
metrics: dict[str, float],
objective: ObjectiveType,
) -> float:
"""Evaluate the scalar objective from a metrics dict."""
if callable(objective):
return float(objective(metrics))
return float(sum(w * metrics.get(k, 0.0) for k, w in objective.items()))
def _fragment_move(
positions: list[Vec3],
per_atom_q6: np.ndarray,
frag_threshold: float,
move_step: float,
rng: random.Random,
) -> list[Vec3]:
"""Displace atoms whose local Q6 exceeds *frag_threshold*.
Falls back to moving a single random atom when no atom exceeds the
threshold (structure is already maximally disordered).
"""
candidates = [i for i, q in enumerate(per_atom_q6) if q > frag_threshold]
if not candidates:
candidates = [rng.randrange(len(positions))]
new_pos = list(positions)
for i in candidates:
x, y, z = positions[i]
new_pos[i] = (
x + rng.uniform(-move_step, move_step),
y + rng.uniform(-move_step, move_step),
z + rng.uniform(-move_step, move_step),
)
return new_pos
def _composition_move(
atoms: list[str],
element_pool: list[str],
rng: random.Random,
charge: int = 0,
mult: int = 1,
) -> list[str]:
"""Propose a parity-preserving composition change.
Two move types are attempted in order:
1. **Swap**: exchange the element types of two atoms with different
elements. Since the total electron count is unchanged, parity is
always preserved.
2. **Parity-preserving replace**: when all atoms are the same element
(so no swap is possible), replace *two* atoms simultaneously so that
the net change in electron count is even, keeping the charge/
multiplicity parity invariant. Both replacement elements are drawn
independently from *element_pool*; if no valid pair is found in
*max_tries* attempts the best single-replacement is used with a
fallback parity correction.
The ``charge`` and ``mult`` parameters are used only to determine the
required electron-count parity (even or odd); they do not affect the
swap path.
Parameters
----------
atoms:
Current element list (modified copy is returned).
element_pool:
Elements available for replacement.
rng:
Seeded random-number generator.
charge:
System charge (used to infer required parity).
mult:
Spin multiplicity 2S+1 (used to infer required parity).
Returns
-------
list[str]
New atom list with the same length as *atoms*.
"""
new_atoms = list(atoms)
n = len(new_atoms)
# ── Path 1: swap two atoms with different elements ────────────────────
# Electron count is preserved → always parity-valid.
for _ in range(20):
i, j = rng.sample(range(n), 2)
if new_atoms[i] != new_atoms[j]:
new_atoms[i], new_atoms[j] = new_atoms[j], new_atoms[i]
return new_atoms
# ── Path 2: parity-preserving two-atom replacement ───────────────────
# All atoms are the same element (swap path found no diverse pair).
# Replace atom i with X and atom j with Y such that
# ΔZ_total = (Z(X) - Z(atoms[i])) + (Z(Y) - Z(atoms[j]))
# is even, preserving the electron-count parity required by charge/mult.
#
# Required parity of total electrons:
# n_electrons = sum(Z) - charge
# For mult=1 (singlet), n_electrons must be even → sum(Z) parity = charge parity
i = rng.randrange(n)
zi = ATOMIC_NUMBERS[new_atoms[i]]
# Separate pool into elements whose Z has the same parity as zi (ΔZ even)
# vs different parity (ΔZ odd → need a compensating second replacement).
same_parity_pool = [e for e in element_pool if ATOMIC_NUMBERS[e] % 2 == zi % 2]
diff_parity_pool = [e for e in element_pool if ATOMIC_NUMBERS[e] % 2 != zi % 2]
if same_parity_pool:
# Replace one atom with same-parity element: ΔZ is even → parity kept.
new_atoms[i] = rng.choice(same_parity_pool)
return new_atoms
if diff_parity_pool and n >= 2:
# Replace two atoms with opposite-parity elements so ΔZ_total is even.
j = rng.choice([k for k in range(n) if k != i])
# Both from diff_parity_pool give ΔZ each odd → ΔZ_total even. ✓
# Both from diff_parity_pool gives ΔZ each odd → ΔZ_total even. ✓
new_atoms[i] = rng.choice(diff_parity_pool)
new_atoms[j] = rng.choice(diff_parity_pool)
return new_atoms
# ── Last-resort fallback ──────────────────────────────────────────────
# Pool has only one parity class and n == 1, or pool is empty.
# Replace one atom; caller's validate_charge_mult will reject if invalid.
new_atoms[i] = rng.choice(element_pool)
return new_atoms
# ---------------------------------------------------------------------------
# StructureOptimizer
# ---------------------------------------------------------------------------
[docs]
class StructureOptimizer:
"""Optimise a single structure to maximise a disorder objective.
Parameters
----------
n_atoms:
Number of atoms.
charge:
Total system charge.
mult:
Spin multiplicity 2S+1.
objective:
Weight dict ``{"METRIC": weight, ...}`` or any callable
``(metrics: dict[str, float]) -> float``.
The optimizer **maximises** the scalar value.
Use negative weights to penalise a metric.
elements:
Element pool — spec string (``"6,7,8"``), list of symbols, or
``None`` for all Z = 1–106.
method:
``"annealing"`` (default), ``"basin_hopping"``, or
``"parallel_tempering"``.
max_steps:
Number of MC steps per restart (or per replica per restart for
``"parallel_tempering"``; default: 5000).
T_start:
Initial temperature (default: 1.0). For
``"parallel_tempering"`` this is the *highest* replica
temperature.
T_end:
Final temperature for SA (default: 0.01). For
``"parallel_tempering"`` this is the *lowest* replica
temperature (the coldest, most selective replica).
BH uses *T_start* throughout.
n_replicas:
Number of temperature replicas for ``"parallel_tempering"``
(default: 4). Ignored for other methods. Temperatures are
spaced geometrically between *T_end* and *T_start*.
pt_swap_interval:
Attempt a replica-exchange swap every this many MC steps
(default: 10). Ignored for other methods.
allow_displacements:
When ``True`` (default), atomic-position moves (fragment moves) are
included in the MC step pool. When ``False``, only composition moves
are performed — atomic coordinates are held fixed and only element
types are optimised. Set to ``False`` when exploring compositional
disorder at fixed geometry (e.g. a pre-relaxed lattice).
Cannot be ``False`` simultaneously with *allow_composition_moves*.
allow_composition_moves:
When ``True`` (default), each MC step randomly chooses between a
**fragment move** (atomic displacement) and a **composition move**
(element-type swap) with equal probability. When ``False``, only
fragment moves are performed — element types are held fixed and only
atomic positions are optimised. Set to ``False`` when the
composition is predetermined and should not change during
optimisation.
Cannot be ``False`` simultaneously with *allow_displacements*.
frag_threshold:
Local Q6 threshold for fragment selection (default: 0.3).
Atoms with local Q6 > threshold are preferentially displaced.
move_step:
Maximum displacement magnitude per coordinate step (Å, default: 0.5).
lcc_threshold:
Minimum ``graph_lcc`` required to accept a step (default: 0.0,
i.e. no connectivity constraint). Set to 0.8 to enforce that at
least 80 % of atoms remain connected.
cov_scale:
Minimum distance scale factor for :func:`relax_positions`.
relax_cycles:
Max repulsion-relaxation cycles per step. Basin-Hopping uses
3× this value for its local-minimisation step.
cutoff:
Distance cutoff (Å) for Steinhardt / graph metrics. Auto-computed
from the element pool when ``None``.
n_bins:
Histogram bins for ``H_spatial`` / ``RDF_dev`` (default: 20).
w_atom:
Weight of ``H_atom`` in ``H_total`` (default: 0.5).
w_spatial:
Weight of ``H_spatial`` in ``H_total`` (default: 0.5).
n_restarts:
Independent optimisation runs (default: 1). The best result
across all restarts is returned.
seed:
Random seed (``None`` → non-deterministic).
verbose:
Print per-step progress to stderr (default: ``False``).
Examples
--------
Class API::
from pasted import StructureOptimizer
opt = StructureOptimizer(
n_atoms=50,
charge=0, mult=1,
elements="24,25,26,27,28", # Cantor alloy
objective={"H_atom": 1.0, "H_spatial": 1.0, "Q6": -2.0},
method="annealing",
max_steps=5000,
lcc_threshold=0.8,
seed=42,
)
best = opt.run()
Callable objective::
opt = StructureOptimizer(
...,
objective=lambda m: m["H_spatial"] - 2.0 * m["Q6"],
)
"""
def __init__(
self,
*,
n_atoms: int,
charge: int,
mult: int,
objective: ObjectiveType,
elements: str | list[str] | None = None,
method: str = "annealing",
max_steps: int = 5000,
T_start: float = 1.0,
T_end: float = 0.01,
frag_threshold: float = 0.3,
move_step: float = 0.5,
allow_composition_moves: bool = True,
allow_displacements: bool = True,
lcc_threshold: float = 0.0,
cov_scale: float = 1.0,
relax_cycles: int = 1500,
cutoff: float | None = None,
n_bins: int = 20,
w_atom: float = 0.5,
w_spatial: float = 0.5,
n_restarts: int = 1,
n_replicas: int = 4,
pt_swap_interval: int = 10,
seed: int | None = None,
verbose: bool = False,
) -> None:
if method not in ("annealing", "basin_hopping", "parallel_tempering"):
raise ValueError(
f"method must be 'annealing', 'basin_hopping', or "
f"'parallel_tempering', got {method!r}"
)
if not allow_displacements and not allow_composition_moves:
raise ValueError(
"allow_displacements and allow_composition_moves cannot both be False: "
"at least one move type must be enabled."
)
self.n_atoms = n_atoms
self.charge = charge
self.mult = mult
self.objective = objective
self.method = method
self.max_steps = max_steps
self.T_start = T_start
self.T_end = T_end
self.frag_threshold = frag_threshold
self.move_step = move_step
self.allow_composition_moves = allow_composition_moves
self.allow_displacements = allow_displacements
self.lcc_threshold = lcc_threshold
self.cov_scale = cov_scale
self.relax_cycles = relax_cycles
self.n_bins = n_bins
self.w_atom = w_atom
self.w_spatial = w_spatial
self.n_restarts = n_restarts
self.n_replicas = n_replicas
self.pt_swap_interval = pt_swap_interval
self.seed = seed
self.verbose = verbose
# Element pool
if elements is None:
self._element_pool: list[str] = default_element_pool()
elif isinstance(elements, str):
self._element_pool = parse_element_spec(elements)
else:
self._element_pool = list(elements)
# Cutoff
self._cutoff: float = self._resolve_cutoff(cutoff)
# ------------------------------------------------------------------ #
# Internal helpers #
# ------------------------------------------------------------------ #
def _log(self, msg: str) -> None:
if self.verbose:
print(msg, file=sys.stderr)
def _resolve_cutoff(self, override: float | None) -> float:
if override is not None:
if self.verbose:
self._log(f"[cutoff] {override:.3f} Å (user-specified)")
return override
radii = [_cov_radius_ang(s) for s in self._element_pool]
pair_sums = sorted(
ra + rb for i, ra in enumerate(radii) for rb in radii[i:]
)
median_sum = pair_sums[len(pair_sums) // 2]
cutoff = self.cov_scale * 1.5 * median_sum
if self.verbose:
self._log(
f"[cutoff] {cutoff:.3f} Å (auto: cov_scale={self.cov_scale} × 1.5 × "
f"median(r_i+r_j)={median_sum:.3f} Å)"
)
return cutoff
def _auto_region(self) -> str:
"""Estimate a sphere radius that gives approximately bulk density."""
v_per_atom = 4 / 3 * math.pi * 1.3**3 # ų, mean atomic radius ~1.3 Å
r = (3 * self.n_atoms * v_per_atom / (4 * math.pi * 0.74)) ** (1 / 3)
return f"sphere:{r * 1.2:.1f}" # 20 % margin
def _make_initial(self, rng: random.Random) -> Structure | None:
"""Generate a valid initial structure using StructureGenerator."""
region = self._auto_region()
for _ in range(50):
seed = rng.randint(0, 2**31)
structs = StructureGenerator(
n_atoms=self.n_atoms,
charge=self.charge,
mult=self.mult,
mode="gas",
region=region,
elements=self._element_pool,
cov_scale=self.cov_scale,
relax_cycles=self.relax_cycles,
cutoff=self._cutoff,
n_bins=self.n_bins,
w_atom=self.w_atom,
w_spatial=self.w_spatial,
n_samples=1,
seed=seed,
).generate()
if structs:
return structs.structures[0]
return None
def _temperature(self, step: int) -> float:
"""Return temperature at *step*. BH and PT use constant T_start."""
if self.method in ("basin_hopping", "parallel_tempering"):
return self.T_start
if self.T_end <= 0 or self.T_start <= 0:
return self.T_start
ratio = self.T_end / self.T_start
return float(self.T_start * ratio ** (step / max(self.max_steps - 1, 1)))
def _relax_cycles_for_method(self) -> int:
"""BH uses 3× relax cycles as local minimisation."""
return self.relax_cycles * 3 if self.method == "basin_hopping" else self.relax_cycles
def _pt_temperatures(self) -> list[float]:
"""Geometric temperature ladder from T_end (coldest) to T_start (hottest)."""
n = max(2, self.n_replicas)
if n == 1:
return [self.T_start]
t_lo = max(self.T_end, 1e-6)
t_hi = max(self.T_start, t_lo * 1.01)
ratio = (t_hi / t_lo) ** (1.0 / (n - 1))
return [round(t_lo * ratio ** k, 8) for k in range(n)] # cold→hot
# ------------------------------------------------------------------ #
# Parallel Tempering #
# ------------------------------------------------------------------ #
def _run_parallel_tempering(
self, initial: Structure | None, restart_idx: int
) -> list[tuple[float, Structure]]:
"""Run one Parallel Tempering restart; return (score, structure) per replica.
Algorithm
---------
1. Build N replicas at temperatures T_0 < T_1 < … < T_{N-1}
(geometric ladder from T_end to T_start).
2. Every step: each replica independently proposes a move and
applies Metropolis acceptance at its own temperature.
3. Every ``pt_swap_interval`` steps: for each adjacent pair (k, k+1)
attempt a replica exchange with the Metropolis criterion::
ΔE = (β_k − β_{k+1}) × (f_{k+1} − f_k)
where β = 1/T and f is the objective value (higher is better).
Accept with probability min(1, exp(ΔE)).
4. Track the global best across all replicas and all steps.
Returns all replica final states sorted best-first so that
``run()`` can incorporate them into ``OptimizationResult``.
"""
rng = random.Random(
None if self.seed is None else self.seed + restart_idx * 97
)
temps = self._pt_temperatures()
n_rep = len(temps)
rc = self.relax_cycles
# ── Initialise one state per replica ────────────────────────────
# Replica 0 = coldest (T_end), Replica N-1 = hottest (T_start).
# All replicas start from the same initial structure (or independent
# gas-mode structures when initial is None) so that they explore
# different regions from the start due to different acceptance rates.
states_atoms: list[list[str]] = []
states_positions: list[list[Vec3]] = []
states_metrics: list[dict[str, float]] = []
states_f: list[float] = []
states_q6: list[np.ndarray] = []
for k in range(n_rep):
if initial is not None:
a = list(initial.atoms)
p = list(initial.positions)
else:
# Generate a fresh random structure for each replica
rng_k = random.Random(
None if self.seed is None else self.seed + restart_idx * 97 + k * 13
)
_, p_raw = place_gas(
[rng.choice(self._element_pool) for _ in range(self.n_atoms)],
self._auto_region(), rng_k,
)
a_raw = [rng.choice(self._element_pool) for _ in range(self.n_atoms)]
ok, _ = validate_charge_mult(a_raw, self.charge, self.mult)
if not ok:
a_raw = list(a_raw)
# fallback: use first-valid replacement
a = a_raw
p_list, _ = relax_positions(a, p_raw, self.cov_scale, rc)
p = list(p_list)
m = compute_all_metrics(
a, p, self.n_bins, self.w_atom, self.w_spatial,
self._cutoff, self.cov_scale,
)
f = _eval_objective(m, self.objective)
pts = np.array(p)
q6: np.ndarray = compute_steinhardt_per_atom(pts, [6], self._cutoff)["Q6"]
states_atoms.append(a)
states_positions.append(p)
states_metrics.append(m)
states_f.append(f)
states_q6.append(q6)
best_atoms = list(states_atoms[0])
best_positions = list(states_positions[0])
best_metrics = dict(states_metrics[0])
best_f = states_f[0]
for k in range(n_rep):
if states_f[k] > best_f:
best_f = states_f[k]
best_atoms = list(states_atoms[k])
best_positions = list(states_positions[k])
best_metrics = dict(states_metrics[k])
# Exchange-attempt counts for diagnostics
n_swap_attempted = 0
n_swap_accepted = 0
# Precompute radii per replica (updated on composition moves)
replicas_radii: list[np.ndarray] = [
np.array([_cov_radius_ang(a) for a in atoms], dtype=float)
for atoms in states_atoms
]
seed_int = -1 if self.seed is None else int(self.seed + restart_idx * 97)
log_interval = max(1, self.max_steps // 20)
width = len(str(self.max_steps))
for step in range(self.max_steps):
# ── Each replica: one Metropolis step ────────────────────────
for k in range(n_rep):
T_k = temps[k]
atoms = states_atoms[k]
positions = states_positions[k]
f_k = states_f[k]
q6_k = states_q6[k]
radii_k = replicas_radii[k]
# Propose move
_do_fragment = (
self.allow_displacements
and (not self.allow_composition_moves or rng.random() < 0.5)
)
if _do_fragment:
new_positions = _fragment_move(
positions, q6_k, self.frag_threshold, self.move_step, rng
)
new_atoms = list(atoms)
else:
new_atoms = _composition_move(
atoms, self._element_pool, rng, self.charge, self.mult
)
new_positions = list(positions)
# Relax
if HAS_RELAX:
if new_atoms != atoms:
radii_new = np.array(
[_cov_radius_ang(a) for a in new_atoms], dtype=float
)
else:
radii_new = radii_k
pts_arr = np.array(new_positions, dtype=float)
pts_arr, _ = _cpp_relax_positions(
pts_arr, radii_new, self.cov_scale, rc, seed_int
)
new_positions = [tuple(row) for row in pts_arr]
else:
new_positions, _ = relax_positions(
new_atoms, new_positions, self.cov_scale, rc
)
radii_new = radii_k
ok_parity, _ = validate_charge_mult(new_atoms, self.charge, self.mult)
if not ok_parity:
continue
new_metrics = compute_all_metrics(
new_atoms, new_positions, self.n_bins,
self.w_atom, self.w_spatial, self._cutoff, self.cov_scale,
)
f_new = _eval_objective(new_metrics, self.objective)
if new_metrics.get("graph_lcc", 0.0) < self.lcc_threshold:
continue
delta = f_new - f_k
accept = delta >= 0 or (
T_k > 1e-12 and rng.random() < math.exp(delta / T_k)
)
if accept:
states_atoms[k] = new_atoms
states_positions[k] = new_positions
states_metrics[k] = new_metrics
states_f[k] = f_new
replicas_radii[k] = radii_new
new_pts = np.array(new_positions)
states_q6[k] = compute_steinhardt_per_atom(
new_pts, [6], self._cutoff
)["Q6"]
if f_new > best_f:
best_f = f_new
best_atoms = list(new_atoms)
best_positions = list(new_positions)
best_metrics = dict(new_metrics)
# ── Replica-exchange swaps every pt_swap_interval steps ───────
if (step + 1) % self.pt_swap_interval == 0:
# Attempt swaps between adjacent replica pairs in random order
pairs = list(range(n_rep - 1))
rng.shuffle(pairs)
for k in pairs:
f_k = states_f[k]
f_k1 = states_f[k + 1]
T_k = temps[k]
T_k1 = temps[k + 1]
# Metropolis criterion for maximization:
# ΔE = (β_k − β_{k+1}) × (f_{k+1} − f_k)
# β = 1/T, higher T = more permissive
beta_k = 1.0 / T_k if T_k > 1e-12 else 1e12
beta_k1 = 1.0 / T_k1 if T_k1 > 1e-12 else 1e12
delta_swap = (beta_k - beta_k1) * (f_k1 - f_k)
n_swap_attempted += 1
if delta_swap >= 0 or rng.random() < math.exp(delta_swap):
# Exchange states k ↔ k+1
(
states_atoms[k], states_atoms[k + 1],
states_positions[k], states_positions[k + 1],
states_metrics[k], states_metrics[k + 1],
states_f[k], states_f[k + 1],
states_q6[k], states_q6[k + 1],
replicas_radii[k], replicas_radii[k + 1],
) = (
states_atoms[k + 1], states_atoms[k],
states_positions[k + 1], states_positions[k],
states_metrics[k + 1], states_metrics[k],
states_f[k + 1], states_f[k],
states_q6[k + 1], states_q6[k],
replicas_radii[k + 1], replicas_radii[k],
)
n_swap_accepted += 1
# ── Logging ──────────────────────────────────────────────────
if self.verbose and (step + 1) % log_interval == 0:
swap_rate = (
n_swap_accepted / n_swap_attempted
if n_swap_attempted > 0 else 0.0
)
self._log(
f"[restart={restart_idx + 1} step={step + 1:>{width}}/{self.max_steps}] "
f"best_f={best_f:.4f} "
f"replica_f=[{', '.join(f'{f:.3f}' for f in states_f)}] "
f"T=[{', '.join(f'{t:.3f}' for t in temps)}] "
f"swap_rate={swap_rate:.2f}"
)
if self.verbose:
swap_rate = n_swap_accepted / max(n_swap_attempted, 1)
self._log(
f"[PT restart={restart_idx + 1}] best_f={best_f:.4f} "
f"swap_accept_rate={swap_rate:.3f} "
f"({n_swap_accepted}/{n_swap_attempted} swaps)"
)
# Return (score, structure) for each replica's final state + global best
all_results: list[tuple[float, Structure]] = []
# Global best first
all_results.append((best_f, Structure(
atoms=best_atoms,
positions=best_positions,
charge=self.charge,
mult=self.mult,
metrics=best_metrics,
mode="opt_parallel_tempering",
sample_index=restart_idx + 1,
seed=self.seed,
)))
# Add each replica's final state (if not identical to best)
for k in range(n_rep):
f_k = states_f[k]
if abs(f_k - best_f) > 1e-10 or states_atoms[k] != best_atoms:
all_results.append((f_k, Structure(
atoms=list(states_atoms[k]),
positions=list(states_positions[k]),
charge=self.charge,
mult=self.mult,
metrics=dict(states_metrics[k]),
mode=f"opt_parallel_tempering_T{temps[k]:.4f}",
sample_index=restart_idx * n_rep + k + 1,
seed=self.seed,
)))
return all_results
# ------------------------------------------------------------------ #
# Single restart #
# ------------------------------------------------------------------ #
def _run_one(self, initial: Structure, restart_idx: int) -> Structure:
rng = random.Random(
None if self.seed is None else self.seed + restart_idx * 97
)
atoms: list[str] = list(initial.atoms)
positions: list[Vec3] = list(initial.positions)
# Pre-compute radii and seed_int once; reused every step.
radii = np.array([_cov_radius_ang(a) for a in atoms], dtype=float)
seed_int: int = -1 if self.seed is None else int(self.seed + restart_idx * 97)
# Initial evaluation
pts = np.array(positions)
metrics = compute_all_metrics(
atoms, positions, self.n_bins, self.w_atom, self.w_spatial, self._cutoff,
self.cov_scale,
)
f_current = _eval_objective(metrics, self.objective)
per_atom_q6: np.ndarray = compute_steinhardt_per_atom(pts, [6], self._cutoff)[
"Q6"
]
best_atoms = list(atoms)
best_positions = list(positions)
best_f = f_current
best_metrics = dict(metrics)
rc = self._relax_cycles_for_method()
log_interval = max(1, self.max_steps // 20)
width = len(str(self.max_steps))
for step in range(self.max_steps):
T = self._temperature(step)
# ── Move ─────────────────────────────────────────────────────
_do_fragment = (
self.allow_displacements
and (not self.allow_composition_moves or rng.random() < 0.5)
)
if _do_fragment:
new_positions = _fragment_move(
positions, per_atom_q6, self.frag_threshold, self.move_step, rng
)
new_atoms = list(atoms)
else:
new_atoms = _composition_move(
atoms, self._element_pool, rng, self.charge, self.mult
)
new_positions = list(positions)
# ── Relax (distance constraint) ───────────────────────────────
# Bypass Python wrapper; call _relax_core directly when available.
if HAS_RELAX:
# radii may need updating if atom types changed (composition move)
if new_atoms != atoms:
new_radii = np.array([_cov_radius_ang(a) for a in new_atoms], dtype=float)
else:
new_radii = radii
new_pts_arr = np.array(new_positions, dtype=float)
new_pts_arr, _ = _cpp_relax_positions(
new_pts_arr, new_radii, self.cov_scale, rc, seed_int
)
new_positions = [tuple(row) for row in new_pts_arr]
else:
new_positions, _ = relax_positions(
new_atoms, new_positions, self.cov_scale, rc
)
# ── Charge/mult validity ──────────────────────────────────────
ok, _ = validate_charge_mult(new_atoms, self.charge, self.mult)
if not ok:
continue
# ── Evaluate ─────────────────────────────────────────────────
new_metrics = compute_all_metrics(
new_atoms,
new_positions,
self.n_bins,
self.w_atom,
self.w_spatial,
self._cutoff,
self.cov_scale,
)
f_new = _eval_objective(new_metrics, self.objective)
# ── Hard connectivity constraint ──────────────────────────────
if new_metrics.get("graph_lcc", 0.0) < self.lcc_threshold:
continue
# ── Accept / reject (Metropolis) ─────────────────────────────
delta = f_new - f_current
accept = delta >= 0 or (
T > 1e-12 and rng.random() < math.exp(delta / T)
)
if accept:
atoms = new_atoms
positions = new_positions
metrics = new_metrics
f_current = f_new
# Update radii cache when composition changed
if atoms is not new_atoms or new_atoms != list(atoms):
radii = np.array([_cov_radius_ang(a) for a in atoms], dtype=float)
# Update per_atom_q6 for next fragment selection
new_pts = np.array(positions)
per_atom_q6 = compute_steinhardt_per_atom(
new_pts, [6], self._cutoff
)["Q6"]
if f_current > best_f:
best_f = f_current
best_atoms = list(atoms)
best_positions = list(positions)
best_metrics = dict(metrics)
# ── Logging ──────────────────────────────────────────────────
if self.verbose and step % log_interval == 0:
self._log(
f"[restart={restart_idx + 1} "
f"step={step + 1:>{width}}/{self.max_steps}] "
f"T={T:.4f} f={f_current:.4f} best={best_f:.4f} "
+ " ".join(f"{k}={_fmt(v)}" for k, v in metrics.items())
)
return Structure(
atoms=best_atoms,
positions=best_positions,
charge=self.charge,
mult=self.mult,
metrics=best_metrics,
mode=f"opt_{self.method}",
sample_index=restart_idx + 1,
seed=self.seed,
)
# ------------------------------------------------------------------ #
# Public interface #
# ------------------------------------------------------------------ #
[docs]
def run(self, initial: Structure | None = None) -> OptimizationResult:
"""Run ``n_restarts`` optimizations and return an :class:`OptimizationResult`.
Each restart begins from an independently generated random gas-mode
structure (or from *initial* if provided). All per-restart results
are collected, sorted by objective value (highest first), and returned
together in an :class:`OptimizationResult`.
:class:`OptimizationResult` is list-compatible: ``result[0]`` and
``result.best`` both return the highest-scoring structure, and
``for s in result`` iterates all restarts in rank order. Existing
code that calls ``opt.run()`` and uses the return value as a single
``Structure`` should switch to ``opt.run().best`` or ``opt.run()[0]``.
A :class:`UserWarning` is emitted when one or more restarts fail to
produce a valid initial structure.
Parameters
----------
initial:
Starting structure. When ``None`` (default), a random gas-mode
structure is generated automatically for each restart.
Returns
-------
OptimizationResult
All per-restart structures sorted by objective value (highest
first), plus summary metadata. Raises :class:`RuntimeError` if
every restart fails to produce a valid initial structure.
Raises
------
RuntimeError
When all restarts fail to produce a valid initial structure.
Examples
--------
Best structure only::
result = opt.run()
print(result.best) # highest-scoring structure
print(result[0]) # same — index 0 is always the best
print(result.summary()) # one-line diagnostic
All restarts::
result = opt.run()
for rank, s in enumerate(result, 1):
print(f"rank {rank}: f={result.objective_scores[rank-1]:.4f} {s}")
"""
rng = random.Random(self.seed)
all_results: list[tuple[float, Structure]] = []
n_attempted = 0
for r in range(self.n_restarts):
self._log(f"[optimize] restart {r + 1}/{self.n_restarts} start")
# ── Parallel Tempering path ───────────────────────────────────
if self.method == "parallel_tempering":
n_attempted += 1
pt_results = self._run_parallel_tempering(initial, r)
all_results.extend(pt_results)
best_pt = max(pt_results, key=lambda x: x[0])
self._log(
f"[optimize] restart {r + 1}/{self.n_restarts} done: "
f"best_f={best_pt[0]:.4f} ({len(pt_results)} replica states)"
)
continue
# ── SA / BH path ──────────────────────────────────────────────
init = initial
if init is None:
init = self._make_initial(rng)
if init is None:
self._log(
f"[optimize] restart {r + 1}: "
"could not generate initial structure, skipping"
)
continue
n_attempted += 1
result = self._run_one(init, r)
f = _eval_objective(result.metrics, self.objective)
self._log(f"[optimize] restart {r + 1}/{self.n_restarts} done: f={f:.4f}")
all_results.append((f, result))
if not all_results:
raise RuntimeError(
"Optimization failed: no valid structure found across all restarts."
)
n_skipped = self.n_restarts - n_attempted
if n_skipped > 0:
warnings.warn(
f"{n_skipped} of {self.n_restarts} restart(s) were skipped because "
f"no valid initial structure could be generated. "
f"Try narrowing the element pool to satisfy "
f"charge={self.charge}, mult={self.mult}.",
UserWarning,
stacklevel=2,
)
all_results.sort(key=lambda x: x[0], reverse=True)
best_f = all_results[0][0]
self._log(f"[optimize] best f={best_f:.4f}")
return OptimizationResult(
all_structures=[s for _, s in all_results],
objective_scores=[f for f, _ in all_results],
n_restarts_attempted=n_attempted,
method=self.method,
)
# ------------------------------------------------------------------ #
# Properties / dunder #
# ------------------------------------------------------------------ #
@property
def element_pool(self) -> list[str]:
"""A copy of the resolved element pool."""
return list(self._element_pool)
@property
def cutoff(self) -> float:
"""Distance cutoff (Å) used for Steinhardt and graph metrics."""
return self._cutoff
[docs]
def __repr__(self) -> str:
pt_info = (
f", n_replicas={self.n_replicas}, pt_swap_interval={self.pt_swap_interval}"
if self.method == "parallel_tempering"
else ""
)
comp_info = "" if self.allow_composition_moves else ", allow_composition_moves=False"
disp_info = "" if self.allow_displacements else ", allow_displacements=False"
return (
f"StructureOptimizer("
f"n_atoms={self.n_atoms}, method={self.method!r}, "
f"max_steps={self.max_steps}, "
f"T_start={self.T_start}, T_end={self.T_end}, "
f"pool_size={len(self._element_pool)}"
f"{pt_info}{comp_info}{disp_info})"
)