feat: reads potentions from ljc file

This commit is contained in:
2026-03-16 01:24:01 -03:00
parent 30be88e6b4
commit 0763c4a9e1
19 changed files with 250 additions and 210 deletions

View File

@@ -1,10 +1,14 @@
import shutil
from diceplayer.dice.dice_input import NVTTerConfig, NVTEqConfig, NPTEqConfig, NPTTerConfig
from diceplayer.dice.dice_input import (
NPTEqConfig,
NPTTerConfig,
NVTEqConfig,
NVTTerConfig,
)
from diceplayer.dice.dice_wrapper import DiceWrapper
from diceplayer.logger import logger
from diceplayer.state.state_model import StateModel
import shutil
from pathlib import Path
from threading import Thread
@@ -15,7 +19,9 @@ class DiceHandler:
def run(self, state: StateModel, cycle: int) -> StateModel:
if self.dice_directory.exists():
logger.info(f"Found dice directory: {self.dice_directory}, this directory will be purged for a clean state")
logger.info(
f"Found dice directory: {self.dice_directory}, this directory will be purged for a clean state"
)
shutil.rmtree(self.dice_directory)
self.dice_directory.mkdir(parents=True)
@@ -38,7 +44,9 @@ class DiceHandler:
t.join()
if len(results) != state.config.dice.nprocs:
raise RuntimeError(f"Expected {state.config.dice.nprocs} simulation results, but got {len(results)}")
raise RuntimeError(
f"Expected {state.config.dice.nprocs} simulation results, but got {len(results)}"
)
return results
@@ -47,21 +55,23 @@ class DiceHandler:
def commit_simulation_state(self, state: StateModel, result: dict) -> StateModel:
return state
def _simulation_process(self, state: StateModel, cycle: int, proc: int, results: list[dict]) -> None:
def _simulation_process(
self, state: StateModel, cycle: int, proc: int, results: list[dict]
) -> None:
proc_directory = self.dice_directory / f"{proc:02d}"
if proc_directory.exists():
shutil.rmtree(proc_directory)
proc_directory.mkdir(parents=True)
dice = DiceWrapper(
state.config.dice, proc_directory
)
dice = DiceWrapper(state.config.dice, proc_directory)
if state.config.dice.randominit == "first" and cycle == 0:
self._generate_phb_file(state, proc_directory)
if state.config.dice.randominit == "first" and cycle >= 0:
self._generate_last_xyz(state, proc_directory)
else:
nvt_ter_config = NVTTerConfig.from_config(state.config)
dice.run(nvt_ter_config)
else:
self._generate_last_xyz(state, proc_directory)
if len(state.config.dice.nstep) == 2:
nvt_eq_config = NVTEqConfig.from_config(state.config)
@@ -76,5 +86,30 @@ class DiceHandler:
results.append(dice.extract_results())
def _generate_last_xyz(self, state: StateModel, proc_directory: Path) -> None:
...
@staticmethod
def _generate_phb_file(state: StateModel, proc_directory: Path) -> None:
fstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f}\n"
phb_file = proc_directory / state.config.dice.ljname
with open(phb_file, "w") as f:
f.write(f"{state.config.dice.combrule}\n")
f.write(f"{len(state.config.dice.nmol)}\n")
for molecule in state.system.molecule:
f.write(f"{len(molecule.atom)} {molecule.molname}\n")
for atom in molecule.atom:
f.write(
fstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
)
)
def _generate_last_xyz(self, state: StateModel, proc_directory: Path) -> None: ...

View File

@@ -9,6 +9,30 @@ from pathlib import Path
from typing import Any, Sequence, TextIO
DICE_KEYWORD_ORDER = [
"title",
"ncores",
"ljname",
"outname",
"nmol",
"dens",
"temp",
"press",
"seed",
"init",
"nstep",
"vstep",
"mstop",
"accum",
"iprint",
"isave",
"irdf",
"upbuf",
]
@dataclass(slots=True)
class BaseConfig(ABC):
ncores: int
@@ -18,13 +42,14 @@ class BaseConfig(ABC):
temp: float
seed: int
isave: int
press: float = 1.0
def write(self, directory: Path, filename: str = "input") -> Path:
input_path = directory / filename
if input_path.exists():
logger.info(f"Dice input file {input_path} already exists and will be overwritten")
logger.info(
f"Dice input file {input_path} already exists and will be overwritten"
)
input_path.unlink()
input_path.parent.mkdir(parents=True, exist_ok=True)
@@ -34,13 +59,18 @@ class BaseConfig(ABC):
return input_path
def write_dice_config(self, io_writer: TextIO) -> None:
for field in fields(self):
key = field.name
value = getattr(self, key)
values = {f.name: getattr(self, f.name) for f in fields(self)}
for key in DICE_KEYWORD_ORDER:
value = values.pop(key, None)
if value is None:
continue
io_writer.write(f"{key} = {self._serialize_value(value)}\n")
# write any remaining fields (future extensions)
for key, value in values.items():
if value is None:
continue
io_writer.write(f"{key} = {self._serialize_value(value)}\n")
io_writer.write("$end\n")
@@ -48,7 +78,7 @@ class BaseConfig(ABC):
@classmethod
def from_config(cls, config: PlayerConfig, **kwargs) -> Self:
base_fields = cls._extract_base_fields(config)
return cls(**base_fields, **kwargs)
return cls(**(base_fields | kwargs))
@staticmethod
def _extract_base_fields(config: PlayerConfig) -> dict[str, Any]:
@@ -60,7 +90,6 @@ class BaseConfig(ABC):
temp=config.dice.temp,
seed=config.dice.seed,
isave=config.dice.isave,
press=config.dice.press,
)
@staticmethod
@@ -91,10 +120,18 @@ class BaseConfig(ABC):
@dataclass(slots=True)
class NVTConfig(BaseConfig):
title: str = "Diceplayer Run - NVT"
dens: float = 0.0
nstep: int = 0
dens: float = ...
nstep: int = ...
vstep: int = 0
@classmethod
def from_config(cls, config: PlayerConfig, **kwargs) -> Self:
return super(NVTConfig, cls).from_config(
config,
dens=config.dice.dens,
nstep=cls._get_nstep(config, 0),
)
# -----------------------------------------------------
# NVT THERMALIZATION
@@ -105,12 +142,12 @@ class NVTConfig(BaseConfig):
class NVTTerConfig(NVTConfig):
title: str = "Diceplayer Run - NVT Thermalization"
upbuf: int = 360
init: str = "yes"
@classmethod
def from_config(cls, config: PlayerConfig, **kwargs) -> Self:
return super(NVTTerConfig, cls).from_config(
config,
dens=config.dice.dens,
nstep=cls._get_nstep(config, 0),
upbuf=config.dice.upbuf,
vstep=0,
@@ -118,9 +155,7 @@ class NVTTerConfig(NVTConfig):
)
def write(self, directory: Path, filename: str = "nvt.ter") -> Path:
return super(NVTTerConfig, self).write(
directory, filename
)
return super(NVTTerConfig, self).write(directory, filename)
# -----------------------------------------------------
@@ -132,12 +167,12 @@ class NVTTerConfig(NVTConfig):
class NVTEqConfig(NVTConfig):
title: str = "Diceplayer Run - NVT Production"
irdf: int = 0
init: str = "yesreadxyz"
@classmethod
def from_config(cls, config: PlayerConfig, **kwargs) -> Self:
return super(NVTEqConfig, cls).from_config(
config,
dens=config.dice.dens,
nstep=cls._get_nstep(config, 1),
irdf=config.dice.irdf,
vstep=0,
@@ -145,9 +180,7 @@ class NVTEqConfig(NVTConfig):
)
def write(self, directory: Path, filename: str = "nvt.eq") -> Path:
return super(NVTEqConfig, self).write(
directory, filename
)
return super(NVTEqConfig, self).write(directory, filename)
# -----------------------------------------------------
@@ -160,6 +193,14 @@ class NPTConfig(BaseConfig):
title: str = "Diceplayer Run - NPT"
nstep: int = 0
vstep: int = 5000
press: float = 1.0
@classmethod
def from_config(cls, config: PlayerConfig, **kwargs) -> Self:
return super(NPTConfig, cls).from_config(
config,
press=config.dice.press,
)
# -----------------------------------------------------
@@ -183,9 +224,7 @@ class NPTTerConfig(NPTConfig):
)
def write(self, directory: Path, filename: str = "npt.ter") -> Path:
return super(NPTTerConfig, self).write(
directory, filename
)
return super(NPTTerConfig, self).write(directory, filename)
# -----------------------------------------------------
@@ -209,6 +248,4 @@ class NPTEqConfig(NPTConfig):
)
def write(self, directory: Path, filename: str = "npt.eq") -> Path:
return super(NPTEqConfig, self).write(
directory, filename
)
return super(NPTEqConfig, self).write(directory, filename)

View File

@@ -1,12 +1,10 @@
import subprocess
from typing import Final
import diceplayer.dice.dice_input as dice_input
from pathlib import Path
from diceplayer.config import DiceConfig
import subprocess
from pathlib import Path
from typing import Final
DICE_FLAG_LINE: Final[int] = -2
DICE_END_FLAG: Final[str] = "End of simulation"
@@ -22,8 +20,9 @@ class DiceWrapper:
output_path = input_path.parent / (input_path.name + ".out")
with open(output_path, "w") as outfile, open(input_path, "r") as infile:
bin_path = self.dice_config.progname.expanduser()
exit_status = subprocess.call(
self.dice_config.progname, stdin=infile, stdout=outfile
bin_path, stdin=infile, stdout=outfile, cwd=self.working_directory
)
if exit_status != 0:
@@ -38,4 +37,3 @@ class DiceWrapper:
def extract_results(self) -> dict:
return {}