chore: better project structure

This commit is contained in:
2026-02-26 18:02:50 -03:00
parent 5d76e49f89
commit cb4b21ab6c
25 changed files with 102 additions and 96 deletions

View File

@@ -0,0 +1,6 @@
from .__interface import Interface
from .dice_interface import DiceInterface
from .gaussian_interface import GaussianInterface
__all__ = ["Interface", "DiceInterface", "GaussianInterface"]

View File

@@ -0,0 +1,26 @@
from __future__ import annotations
from diceplayer.config.player_config import PlayerConfig
from diceplayer.environment.system import System
from abc import ABC, abstractmethod
class Interface(ABC):
__slots__ = ["step", "system"]
def __init__(self):
self.system: System | None = None
self.step: PlayerConfig | None = None
@abstractmethod
def configure(self, step: PlayerConfig, system: System):
pass
@abstractmethod
def start(self, cycle: int):
pass
@abstractmethod
def reset(self):
pass

View File

@@ -0,0 +1,389 @@
from __future__ import annotations
from diceplayer import logger
from diceplayer.config.player_config import PlayerConfig
from diceplayer.environment.system import System
from diceplayer.interface import Interface
from setproctitle import setproctitle
from typing_extensions import Final, TextIO
import os
import random
import shutil
import subprocess
import sys
import time
from multiprocessing import Process, connection
from pathlib import Path
DICE_END_FLAG: Final[str] = "End of simulation"
DICE_FLAG_LINE: Final[int] = -2
UMAANG3_TO_GCM3: Final[float] = 1.6605
MAX_SEED: Final[int] = 4294967295
class DiceInterface(Interface):
title = "Diceplayer run"
def configure(self, step: PlayerConfig, system: System):
self.step = step
self.system = system
def start(self, cycle: int):
procs = []
sentinels = []
for proc in range(1, self.step.nprocs + 1):
p = Process(target=self._simulation_process, args=(cycle, proc))
p.start()
procs.append(p)
sentinels.append(p.sentinel)
while procs:
finished = connection.wait(sentinels)
for proc_sentinel in finished:
i = sentinels.index(proc_sentinel)
status = procs[i].exitcode
procs.pop(i)
sentinels.pop(i)
if status != 0:
for p in procs:
p.terminate()
sys.exit(status)
logger.info("\n")
def reset(self):
del self.step
del self.system
def _simulation_process(self, cycle: int, proc: int):
setproctitle(f"diceplayer-step{cycle:0d}-p{proc:0d}")
try:
self._make_proc_dir(cycle, proc)
self._make_dice_inputs(cycle, proc)
self._run_dice(cycle, proc)
except Exception as err:
sys.exit(err)
def _make_proc_dir(self, cycle, proc):
simulation_dir = Path(self.step.simulation_dir)
if not simulation_dir.exists():
simulation_dir.mkdir(parents=True)
proc_dir = Path(simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
proc_dir.mkdir(parents=True, exist_ok=True)
def _make_dice_inputs(self, cycle, proc):
proc_dir = Path(self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
self._make_potentials(proc_dir)
random.seed(self._make_dice_seed())
# This is logic is used to make the initial configuration file
# for the next cycle using the last.xyz file from the previous cycle.
if self.step.dice.randominit == "first" and cycle > 1:
last_xyz = Path(
self.step.simulation_dir,
f"step{(cycle - 1):02d}",
f"p{proc:02d}",
"last.xyz",
)
if not last_xyz.exists():
raise FileNotFoundError(f"File {last_xyz} not found.")
with open(last_xyz, "r") as last_xyz_file:
self._make_init_file(proc_dir, last_xyz_file)
last_xyz_file.seek(0)
self.step.dice.dens = self._new_density(last_xyz_file)
else:
self._make_nvt_ter(cycle, proc_dir)
if len(self.step.dice.nstep) == 2:
self._make_nvt_eq(cycle, proc_dir)
elif len(self.step.dice.nstep) == 3:
self._make_npt_ter(cycle, proc_dir)
self._make_npt_eq(proc_dir)
def _run_dice(self, cycle: int, proc: int):
working_dir = os.getcwd()
proc_dir = Path(self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
logger.info(
f"Simulation process {str(proc_dir)} initiated with pid {os.getpid()}"
)
os.chdir(proc_dir)
if not (self.step.dice.randominit == "first" and cycle > 1):
self.run_dice_file(cycle, proc, "NVT.ter")
if len(self.step.dice.nstep) == 2:
self.run_dice_file(cycle, proc, "NVT.eq")
elif len(self.step.dice.nstep) == 3:
self.run_dice_file(cycle, proc, "NPT.ter")
self.run_dice_file(cycle, proc, "NPT.eq")
os.chdir(working_dir)
xyz_file = Path(proc_dir, "phb.xyz")
last_xyz_file = Path(proc_dir, "last.xyz")
if xyz_file.exists():
shutil.copy(xyz_file, last_xyz_file)
else:
raise FileNotFoundError(f"File {xyz_file} not found.")
@staticmethod
def _make_dice_seed() -> int:
num = time.time()
num = (num - int(num)) * 1e6
num = int((num - int(num)) * 1e6)
return (os.getpid() * num) % (MAX_SEED + 1)
def _make_init_file(self, proc_dir: Path, last_xyz_file: TextIO):
xyz_lines = last_xyz_file.readlines()
SECONDARY_MOLECULE_LENGTH = 0
for i in range(1, len(self.step.dice.nmol)):
SECONDARY_MOLECULE_LENGTH += self.step.dice.nmol[i] * len(
self.system.molecule[i].atom
)
xyz_lines = xyz_lines[-SECONDARY_MOLECULE_LENGTH:]
input_file = Path(proc_dir, self.step.dice.outname + ".xy")
with open(input_file, "w") as f:
for atom in self.system.molecule[0].atom:
f.write(f"{atom.rx:>10.6f} {atom.ry:>10.6f} {atom.rz:>10.6f}\n")
for line in xyz_lines:
atom = line.split()
rx = float(atom[1])
ry = float(atom[2])
rz = float(atom[3])
f.write(f"{rx:>10.6f} {ry:>10.6f} {rz:>10.6f}\n")
f.write("$end")
def _new_density(self, last_xyz_file: TextIO):
last_xyz_lines = last_xyz_file.readlines()
box = last_xyz_lines[1].split()
volume = float(box[-3]) * float(box[-2]) * float(box[-1])
total_mass = 0
for i in range(len(self.system.molecule)):
total_mass += self.system.molecule[i].total_mass * self.step.dice.nmol[i]
density = (total_mass / volume) * UMAANG3_TO_GCM3
return density
def _make_nvt_ter(self, cycle, proc_dir):
file = Path(proc_dir, "NVT.ter")
with open(file, "w") as f:
f.write(f"title = {self.title} - NVT Thermalization\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
f.write(f"outname = {self.step.dice.outname}\n")
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
f.write(f"nmol = {mol_string}\n")
f.write(f"dens = {self.step.dice.dens}\n")
f.write(f"temp = {self.step.dice.temp}\n")
if self.step.dice.randominit == "first" and cycle > 1:
f.write("init = yesreadxyz\n")
f.write(f"nstep = {self.step.altsteps}\n")
else:
f.write("init = yes\n")
f.write(f"nstep = {self.step.dice.nstep[0]}\n")
f.write("vstep = 0\n")
f.write("mstop = 1\n")
f.write("accum = no\n")
f.write("iprint = 1\n")
f.write("isave = 0\n")
f.write("irdf = 0\n")
seed = int(1e6 * random.random())
f.write(f"seed = {seed}\n")
f.write(f"upbuf = {self.step.dice.upbuf}")
def _make_nvt_eq(self, cycle, proc_dir):
file = Path(proc_dir, "NVT.eq")
with open(file, "w") as f:
f.write(f"title = {self.title} - NVT Production\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
f.write(f"outname = {self.step.dice.outname}\n")
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
f.write(f"nmol = {mol_string}\n")
f.write(f"dens = {self.step.dice.dens}\n")
f.write(f"temp = {self.step.dice.temp}\n")
if self.step.dice.randominit == "first" and cycle > 1:
f.write("init = yesreadxyz\n")
else:
f.write("init = no\n")
f.write(f"nstep = {self.step.dice.nstep[1]}\n")
f.write("vstep = 0\n")
f.write("mstop = 1\n")
f.write("accum = no\n")
f.write("iprint = 1\n")
f.write(f"isave = {self.step.dice.isave}\n")
f.write(f"irdf = {10 * self.step.nprocs}\n")
seed = int(1e6 * random.random())
f.write("seed = {}\n".format(seed))
def _make_npt_ter(self, cycle, proc_dir):
file = Path(proc_dir, "NPT.ter")
with open(file, "w") as f:
f.write(f"title = {self.title} - NPT Thermalization\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
f.write(f"outname = {self.step.dice.outname}\n")
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
f.write(f"nmol = {mol_string}\n")
f.write(f"press = {self.step.dice.press}\n")
f.write(f"temp = {self.step.dice.temp}\n")
if self.step.dice.randominit == "first" and cycle > 1:
f.write("init = yesreadxyz\n")
f.write(f"dens = {self.step.dice.dens:<8.4f}\n")
f.write(f"vstep = {int(self.step.altsteps / 5)}\n")
else:
f.write("init = no\n")
f.write(f"vstep = {int(self.step.dice.nstep[1] / 5)}\n")
f.write("nstep = 5\n")
f.write("mstop = 1\n")
f.write("accum = no\n")
f.write("iprint = 1\n")
f.write("isave = 0\n")
f.write("irdf = 0\n")
seed = int(1e6 * random.random())
f.write(f"seed = {seed}\n")
def _make_npt_eq(self, proc_dir):
file = Path(proc_dir, "NPT.eq")
with open(file, "w") as f:
f.write(f"title = {self.title} - NPT Production\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
f.write(f"outname = {self.step.dice.outname}\n")
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
f.write(f"nmol = {mol_string}\n")
f.write(f"press = {self.step.dice.press}\n")
f.write(f"temp = {self.step.dice.temp}\n")
f.write("nstep = 5\n")
f.write(f"vstep = {int(self.step.dice.nstep[2] / 5)}\n")
f.write("init = no\n")
f.write("mstop = 1\n")
f.write("accum = no\n")
f.write("iprint = 1\n")
f.write(f"isave = {self.step.dice.isave}\n")
f.write(f"irdf = {10 * self.step.nprocs}\n")
seed = int(1e6 * random.random())
f.write(f"seed = {seed}\n")
def _make_potentials(self, proc_dir):
fstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f}\n"
file = Path(proc_dir, self.step.dice.ljname)
with open(file, "w") as f:
f.write(f"{self.step.dice.combrule}\n")
f.write(f"{len(self.step.dice.nmol)}\n")
nsites_qm = len(self.system.molecule[0].atom)
f.write(f"{nsites_qm} {self.system.molecule[0].molname}\n")
for atom in self.system.molecule[0].atom:
f.write(
fstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
)
)
for mol in self.system.molecule[1:]:
f.write(f"{len(mol.atom)} {mol.molname}\n")
for atom in mol.atom:
f.write(
fstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
)
)
def run_dice_file(self, cycle: int, proc: int, file_name: str):
with (
open(Path(file_name), "r") as infile,
open(Path(file_name + ".out"), "w") as outfile,
):
if shutil.which("bash") is not None:
exit_status = subprocess.call(
[
"bash",
"-c",
f"exec -a dice-step{cycle}-p{proc} {self.step.dice.progname} < {infile.name} > {outfile.name}",
]
)
else:
exit_status = subprocess.call(
self.step.dice.progname, stdin=infile, stdout=outfile
)
if exit_status != 0:
raise RuntimeError(
f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly"
)
with open(Path(file_name + ".out"), "r") as outfile:
flag = outfile.readlines()[DICE_FLAG_LINE].strip()
if flag != DICE_END_FLAG:
raise RuntimeError(
f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly"
)
logger.info(f"Dice {file_name} - step{cycle:02d}-p{proc:02d} exited properly")

View File

@@ -0,0 +1,359 @@
from __future__ import annotations
from diceplayer import logger
from diceplayer.config.player_config import PlayerConfig
from diceplayer.environment.atom import Atom
from diceplayer.environment.molecule import Molecule
from diceplayer.environment.system import System
from diceplayer.interface import Interface
from diceplayer.utils.misc import date_time
from diceplayer.utils.ptable import atomsymb
import numpy as np
from nptyping import NDArray
from typing_extensions import Any, Dict, List, Tuple
import os
import shutil
import subprocess
import textwrap
from pathlib import Path
class GaussianInterface(Interface):
def configure(self, step_dto: PlayerConfig, system: System):
self.system = system
self.step = step_dto
def start(self, cycle: int) -> Dict[str, NDArray]:
self._make_qm_dir(cycle)
if cycle > 1:
self._copy_chk_file_from_previous_step(cycle)
asec_charges = self.populate_asec_vdw(cycle)
self._make_gaussian_input_file(cycle, asec_charges)
self._run_gaussian(cycle)
self._run_formchk(cycle)
return_value = {}
if self.step.opt:
# return_value['position'] = np.array(
# self._run_optimization(cycle)
# )
raise NotImplementedError("Optimization not implemented yet.")
else:
return_value["charges"] = np.array(self._read_charges_from_fchk(cycle))
return return_value
def reset(self):
del self.step
del self.system
def _make_qm_dir(self, cycle: int):
qm_dir_path = Path(self.step.simulation_dir, f"step{cycle:02d}", "qm")
if not qm_dir_path.exists():
qm_dir_path.mkdir()
def _copy_chk_file_from_previous_step(self, cycle: int):
current_chk_file_path = Path(
self.step.simulation_dir, f"step{cycle:02d}", "qm", "asec.chk"
)
if current_chk_file_path.exists():
raise FileExistsError(f"File {current_chk_file_path} already exists.")
previous_chk_file_path = Path(
self.step.simulation_dir, f"step{(cycle - 1):02d}", "qm", "asec.chk"
)
if not previous_chk_file_path.exists():
raise FileNotFoundError(f"File {previous_chk_file_path} does not exist.")
shutil.copy(previous_chk_file_path, current_chk_file_path)
def populate_asec_vdw(self, cycle: int) -> list[dict]:
norm_factor = self._calculate_norm_factor()
nsitesref = len(self.system.molecule[0].atom)
nsites_total = self._calculate_total_number_of_sites(nsitesref)
proc_charges = []
for proc in range(1, self.step.nprocs + 1):
proc_charges.append(self._read_charges_from_last_step(cycle, proc))
asec_charges, thickness, picked_mols = self._evaluate_proc_charges(
nsites_total, proc_charges
)
logger.info(
f"In average, {(sum(picked_mols) / norm_factor):^7.2f} molecules\n"
f"were selected from each of the {len(picked_mols)} configurations\n"
f"of the production simulations to form the ASEC, comprising a shell with\n"
f"minimum thickness of {(sum(thickness) / norm_factor):>6.2f} Angstrom\n"
)
for charge in asec_charges:
charge["chg"] = charge["chg"] / norm_factor
return asec_charges
def _calculate_norm_factor(self) -> int:
if self.step.dice.nstep[-1] % self.step.dice.isave == 0:
nconfigs = round(self.step.dice.nstep[-1] / self.step.dice.isave)
else:
nconfigs = int(self.step.dice.nstep[-1] / self.step.dice.isave)
return nconfigs * self.step.nprocs
def _calculate_total_number_of_sites(self, nsitesref) -> int:
nsites_total = self.step.dice.nmol[0] * nsitesref
for i in range(1, len(self.step.dice.nmol)):
nsites_total += self.step.dice.nmol[i] * len(self.system.molecule[i].atom)
return nsites_total
def _read_charges_from_last_step(self, cycle: int, proc: int) -> list[str]:
last_xyz_file_path = Path(
self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}", "last.xyz"
)
if not last_xyz_file_path.exists():
raise FileNotFoundError(f"File {last_xyz_file_path} does not exist.")
with open(last_xyz_file_path, "r") as last_xyz_file:
lines = last_xyz_file.readlines()
return lines
def _evaluate_proc_charges(
self, total_nsites: int, proc_charges: list[list[str]]
) -> Tuple[List[Dict[str, float | Any]], List[float], List[int]]:
asec_charges = []
thickness = []
picked_mols = []
for charges in proc_charges:
charges_nsites = int(charges.pop(0))
if int(charges_nsites) != total_nsites:
raise ValueError(
"Number of sites does not match total number of sites."
)
thickness.append(self._calculate_proc_thickness(charges))
nsites_ref_mol = len(self.system.molecule[0].atom)
charges = charges[nsites_ref_mol:]
mol_count = 0
for type in range(len(self.step.dice.nmol)):
if type == 0:
# Reference Molecule must be ignored from type 0
nmols = self.step.dice.nmol[type] - 1
else:
nmols = self.step.dice.nmol[type]
for mol in range(nmols):
new_molecule = Molecule("ASEC TMP MOLECULE")
for site in range(len(self.system.molecule[type].atom)):
line = charges.pop(0).split()
if (
line[0].title()
!= atomsymb[
self.system.molecule[type].atom[site].na
].strip()
):
raise SyntaxError(
"Error: Invalid Dice Output. Atom type does not match."
)
new_molecule.add_atom(
Atom(
self.system.molecule[type].atom[site].lbl,
self.system.molecule[type].atom[site].na,
float(line[1]),
float(line[2]),
float(line[3]),
self.system.molecule[type].atom[site].chg,
self.system.molecule[type].atom[site].eps,
self.system.molecule[type].atom[site].sig,
)
)
distance = self.system.molecule[0].minimum_distance(new_molecule)
if distance < thickness[-1]:
for atom in new_molecule.atom:
asec_charges.append(
{
"lbl": atomsymb[atom.na],
"rx": atom.rx,
"ry": atom.ry,
"rz": atom.rz,
"chg": atom.chg,
}
)
mol_count += 1
picked_mols.append(mol_count)
return asec_charges, thickness, picked_mols
def _calculate_proc_thickness(self, charges: list[str]) -> float:
box = charges.pop(0).split()[-3:]
box = [float(box[0]), float(box[1]), float(box[2])]
sizes = self.system.molecule[0].sizes_of_molecule()
return min(
[
(box[0] - sizes[0]) / 2,
(box[1] - sizes[1]) / 2,
(box[2] - sizes[2]) / 2,
]
)
def _make_gaussian_input_file(self, cycle: int, asec_charges: list[dict]) -> None:
gaussian_input_file_path = Path(
self.step.simulation_dir, f"step{cycle:02d}", "qm", "asec.gjf"
)
with open(gaussian_input_file_path, "w") as gaussian_input_file:
gaussian_input_file.writelines(
self._generate_gaussian_input(cycle, asec_charges)
)
def _generate_gaussian_input(
self, cycle: int, asec_charges: list[dict]
) -> list[str]:
gaussian_input = ["%Chk=asec.chk\n"]
if self.step.mem is not None:
gaussian_input.append(f"%Mem={self.step.mem}GB\n")
gaussian_input.append(f"%Nprocs={self.step.nprocs * self.step.ncores}\n")
kwords_line = f"#P {self.step.gaussian.level}"
if self.step.gaussian.keywords:
kwords_line += " " + self.step.gaussian.keywords
if self.step.opt == "yes":
kwords_line += " Force"
kwords_line += " NoSymm"
kwords_line += f" Pop={self.step.gaussian.pop} Density=Current"
if cycle > 1:
kwords_line += " Guess=Read"
gaussian_input.append(textwrap.fill(kwords_line, 90))
gaussian_input.append("\n")
gaussian_input.append("\nForce calculation - Cycle number {}\n".format(cycle))
gaussian_input.append("\n")
gaussian_input.append(
f"{self.step.gaussian.chgmult[0]},{self.step.gaussian.chgmult[1]}\n"
)
for atom in self.system.molecule[0].atom:
symbol = atomsymb[atom.na]
gaussian_input.append(
"{:<2s} {:>10.5f} {:>10.5f} {:>10.5f}\n".format(
symbol, atom.rx, atom.ry, atom.rz
)
)
gaussian_input.append("\n")
for charge in asec_charges:
gaussian_input.append(
"{:>10.5f} {:>10.5f} {:>10.5f} {:>11.8f}\n".format(
charge["rx"], charge["ry"], charge["rz"], charge["chg"]
)
)
gaussian_input.append("\n")
return gaussian_input
def _run_gaussian(self, cycle: int) -> None:
qm_dir = Path(self.step.simulation_dir, f"step{(cycle):02d}", "qm")
working_dir = os.getcwd()
os.chdir(qm_dir)
infile = "asec.gjf"
operation = None
if self.step.opt:
operation = "forces"
else:
operation = "charges"
logger.info(
f"Calculation of {operation} initiated with Gaussian on {date_time()}\n"
)
if shutil.which("bash") is not None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a {}-step{} {} {}".format(
self.step.gaussian.qmprog,
cycle,
self.step.gaussian.qmprog,
infile,
),
]
)
else:
exit_status = subprocess.call([self.step.gaussian.qmprog, infile])
if exit_status != 0:
raise SystemError("Gaussian process did not exit properly")
logger.info(f"Calculation of {operation} finished on {date_time()}")
os.chdir(working_dir)
def _run_formchk(self, cycle: int):
qm_dir = Path(self.step.simulation_dir, f"step{(cycle):02d}", "qm")
work_dir = os.getcwd()
os.chdir(qm_dir)
logger.info("Formatting the checkpoint file... \n")
exit_status = subprocess.call(
["formchk", "asec.chk"], stdout=subprocess.DEVNULL
)
if exit_status != 0:
raise SystemError("Formchk process did not exit properly")
logger.info("Done\n")
os.chdir(work_dir)
def _read_charges_from_fchk(self, cycle: int):
fchk_file_path = Path("simfiles", f"step{cycle:02d}", "qm", "asec.fchk")
with open(fchk_file_path) as fchk:
fchkfile = fchk.readlines()
if self.step.gaussian.pop in ["chelpg", "mk"]:
CHARGE_FLAG = "ESP Charges"
else:
CHARGE_FLAG = "ESP Charges"
start = fchkfile.pop(0).strip()
while start.find(CHARGE_FLAG) != 0: # expression in begining of line
start = fchkfile.pop(0).strip()
charges: List[float] = []
while len(charges) < len(self.system.molecule[0].atom):
charges.extend([float(x) for x in fchkfile.pop(0).split()])
return charges