refactor: replace Logger with RunLogger and streamline logging setup
This commit is contained in:
@@ -1,8 +1,3 @@
|
||||
from diceplayer.utils import Logger
|
||||
|
||||
from importlib import metadata
|
||||
from diceplayer.utils.logger import RunLogger
|
||||
|
||||
|
||||
VERSION = metadata.version("diceplayer")
|
||||
|
||||
logger = Logger(__name__)
|
||||
|
||||
@@ -1,22 +1,34 @@
|
||||
from diceplayer import VERSION, logger
|
||||
from diceplayer.player import Player
|
||||
import yaml
|
||||
|
||||
from diceplayer.config.player_config import PlayerConfig
|
||||
from diceplayer.logger import logger
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
from importlib import metadata
|
||||
|
||||
from diceplayer.player import Player
|
||||
|
||||
VERSION = metadata.version("diceplayer")
|
||||
|
||||
|
||||
def read_input(infile) -> PlayerConfig:
|
||||
try:
|
||||
with open(infile, "r") as f:
|
||||
return PlayerConfig.model_validate(
|
||||
yaml.safe_load(f)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to read input file")
|
||||
raise e
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Read and store the arguments passed to the program
|
||||
and set the usage and help messages
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser(prog="Diceplayer")
|
||||
parser.add_argument(
|
||||
"-c", "--continue", dest="opt_continue", default=False, action="store_true"
|
||||
"-v", "--version", action="version", version="diceplayer-" + VERSION
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--version", action="version", version="diceplayer-" + VERSION
|
||||
"-c", "--continue", dest="continuation", default=False, action="store_true"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
@@ -36,35 +48,12 @@ def main():
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Open OUTFILE for writing and print keywords and initial info
|
||||
logger.set_logger(args.outfile, logging.INFO)
|
||||
logger.set_output_file(args.outfile)
|
||||
|
||||
if args.opt_continue:
|
||||
player = Player.from_save()
|
||||
else:
|
||||
player = Player.from_file(args.infile)
|
||||
config = read_input(args.infile)
|
||||
|
||||
player.read_potentials()
|
||||
|
||||
player.create_simulation_dir()
|
||||
player.create_geoms_file()
|
||||
|
||||
player.print_keywords()
|
||||
|
||||
player.print_potentials()
|
||||
|
||||
player.prepare_system()
|
||||
|
||||
player.start()
|
||||
|
||||
logger.info("\n+" + 88 * "-" + "+\n")
|
||||
|
||||
player.print_results()
|
||||
|
||||
logger.info("\n+" + 88 * "-" + "+\n")
|
||||
|
||||
logger.info("Diceplayer finished successfully \n")
|
||||
Player(config).play(continuation=args.continuation)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
@@ -1,14 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from diceplayer import logger
|
||||
from diceplayer.logger import logger
|
||||
from diceplayer.environment import Atom
|
||||
from diceplayer.utils.cache import invalidate_computed_properties
|
||||
from diceplayer.utils.misc import BOHR2ANG, EA_2_DEBYE
|
||||
from diceplayer.utils.ptable import GHOST_NUMBER
|
||||
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
import numpy.linalg as linalg
|
||||
import numpy.typing as npt
|
||||
from typing_extensions import List, Self, Tuple
|
||||
|
||||
import math
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
from .__interface import Interface
|
||||
from .dice_interface import DiceInterface
|
||||
from .gaussian_interface import GaussianInterface
|
||||
|
||||
|
||||
__all__ = ["Interface", "DiceInterface", "GaussianInterface"]
|
||||
@@ -1,26 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from diceplayer.config.player_config import PlayerConfig
|
||||
from diceplayer.environment.system import System
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class Interface(ABC):
|
||||
__slots__ = ["step", "system"]
|
||||
|
||||
def __init__(self):
|
||||
self.system: System | None = None
|
||||
self.step: PlayerConfig | None = None
|
||||
|
||||
@abstractmethod
|
||||
def configure(self, step: PlayerConfig, system: System):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start(self, cycle: int):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def reset(self):
|
||||
pass
|
||||
@@ -1,389 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from diceplayer import logger
|
||||
from diceplayer.config.player_config import PlayerConfig
|
||||
from diceplayer.environment.system import System
|
||||
from diceplayer.interface import Interface
|
||||
|
||||
from setproctitle import setproctitle
|
||||
from typing_extensions import Final, TextIO
|
||||
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from multiprocessing import Process, connection
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
DICE_END_FLAG: Final[str] = "End of simulation"
|
||||
DICE_FLAG_LINE: Final[int] = -2
|
||||
UMAANG3_TO_GCM3: Final[float] = 1.6605
|
||||
|
||||
MAX_SEED: Final[int] = 4294967295
|
||||
|
||||
|
||||
class DiceInterface(Interface):
|
||||
title = "Diceplayer run"
|
||||
|
||||
def configure(self, step: PlayerConfig, system: System):
|
||||
self.step = step
|
||||
self.system = system
|
||||
|
||||
def start(self, cycle: int):
|
||||
procs = []
|
||||
sentinels = []
|
||||
|
||||
for proc in range(1, self.step.nprocs + 1):
|
||||
p = Process(target=self._simulation_process, args=(cycle, proc))
|
||||
p.start()
|
||||
|
||||
procs.append(p)
|
||||
sentinels.append(p.sentinel)
|
||||
|
||||
while procs:
|
||||
finished = connection.wait(sentinels)
|
||||
for proc_sentinel in finished:
|
||||
i = sentinels.index(proc_sentinel)
|
||||
status = procs[i].exitcode
|
||||
procs.pop(i)
|
||||
sentinels.pop(i)
|
||||
if status != 0:
|
||||
for p in procs:
|
||||
p.terminate()
|
||||
sys.exit(status)
|
||||
|
||||
logger.info("\n")
|
||||
|
||||
def reset(self):
|
||||
del self.step
|
||||
del self.system
|
||||
|
||||
def _simulation_process(self, cycle: int, proc: int):
|
||||
setproctitle(f"diceplayer-step{cycle:0d}-p{proc:0d}")
|
||||
|
||||
try:
|
||||
self._make_proc_dir(cycle, proc)
|
||||
self._make_dice_inputs(cycle, proc)
|
||||
self._run_dice(cycle, proc)
|
||||
except Exception as err:
|
||||
sys.exit(err)
|
||||
|
||||
def _make_proc_dir(self, cycle, proc):
|
||||
simulation_dir = Path(self.step.simulation_dir)
|
||||
if not simulation_dir.exists():
|
||||
simulation_dir.mkdir(parents=True)
|
||||
|
||||
proc_dir = Path(simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
|
||||
proc_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _make_dice_inputs(self, cycle, proc):
|
||||
proc_dir = Path(self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
|
||||
|
||||
self._make_potentials(proc_dir)
|
||||
|
||||
random.seed(self._make_dice_seed())
|
||||
|
||||
# This is logic is used to make the initial configuration file
|
||||
# for the next cycle using the last.xyz file from the previous cycle.
|
||||
if self.step.dice.randominit == "first" and cycle > 1:
|
||||
last_xyz = Path(
|
||||
self.step.simulation_dir,
|
||||
f"step{(cycle - 1):02d}",
|
||||
f"p{proc:02d}",
|
||||
"last.xyz",
|
||||
)
|
||||
if not last_xyz.exists():
|
||||
raise FileNotFoundError(f"File {last_xyz} not found.")
|
||||
|
||||
with open(last_xyz, "r") as last_xyz_file:
|
||||
self._make_init_file(proc_dir, last_xyz_file)
|
||||
last_xyz_file.seek(0)
|
||||
self.step.dice.dens = self._new_density(last_xyz_file)
|
||||
|
||||
else:
|
||||
self._make_nvt_ter(cycle, proc_dir)
|
||||
|
||||
if len(self.step.dice.nstep) == 2:
|
||||
self._make_nvt_eq(cycle, proc_dir)
|
||||
|
||||
elif len(self.step.dice.nstep) == 3:
|
||||
self._make_npt_ter(cycle, proc_dir)
|
||||
self._make_npt_eq(proc_dir)
|
||||
|
||||
def _run_dice(self, cycle: int, proc: int):
|
||||
working_dir = os.getcwd()
|
||||
|
||||
proc_dir = Path(self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
|
||||
|
||||
logger.info(
|
||||
f"Simulation process {str(proc_dir)} initiated with pid {os.getpid()}"
|
||||
)
|
||||
|
||||
os.chdir(proc_dir)
|
||||
|
||||
if not (self.step.dice.randominit == "first" and cycle > 1):
|
||||
self.run_dice_file(cycle, proc, "NVT.ter")
|
||||
|
||||
if len(self.step.dice.nstep) == 2:
|
||||
self.run_dice_file(cycle, proc, "NVT.eq")
|
||||
|
||||
elif len(self.step.dice.nstep) == 3:
|
||||
self.run_dice_file(cycle, proc, "NPT.ter")
|
||||
self.run_dice_file(cycle, proc, "NPT.eq")
|
||||
|
||||
os.chdir(working_dir)
|
||||
|
||||
xyz_file = Path(proc_dir, "phb.xyz")
|
||||
last_xyz_file = Path(proc_dir, "last.xyz")
|
||||
|
||||
if xyz_file.exists():
|
||||
shutil.copy(xyz_file, last_xyz_file)
|
||||
else:
|
||||
raise FileNotFoundError(f"File {xyz_file} not found.")
|
||||
|
||||
@staticmethod
|
||||
def _make_dice_seed() -> int:
|
||||
num = time.time()
|
||||
num = (num - int(num)) * 1e6
|
||||
|
||||
num = int((num - int(num)) * 1e6)
|
||||
|
||||
return (os.getpid() * num) % (MAX_SEED + 1)
|
||||
|
||||
def _make_init_file(self, proc_dir: Path, last_xyz_file: TextIO):
|
||||
xyz_lines = last_xyz_file.readlines()
|
||||
|
||||
SECONDARY_MOLECULE_LENGTH = 0
|
||||
for i in range(1, len(self.step.dice.nmol)):
|
||||
SECONDARY_MOLECULE_LENGTH += self.step.dice.nmol[i] * len(
|
||||
self.system.molecule[i].atom
|
||||
)
|
||||
|
||||
xyz_lines = xyz_lines[-SECONDARY_MOLECULE_LENGTH:]
|
||||
|
||||
input_file = Path(proc_dir, self.step.dice.outname + ".xy")
|
||||
with open(input_file, "w") as f:
|
||||
for atom in self.system.molecule[0].atom:
|
||||
f.write(f"{atom.rx:>10.6f} {atom.ry:>10.6f} {atom.rz:>10.6f}\n")
|
||||
|
||||
for line in xyz_lines:
|
||||
atom = line.split()
|
||||
rx = float(atom[1])
|
||||
ry = float(atom[2])
|
||||
rz = float(atom[3])
|
||||
f.write(f"{rx:>10.6f} {ry:>10.6f} {rz:>10.6f}\n")
|
||||
|
||||
f.write("$end")
|
||||
|
||||
def _new_density(self, last_xyz_file: TextIO):
|
||||
last_xyz_lines = last_xyz_file.readlines()
|
||||
|
||||
box = last_xyz_lines[1].split()
|
||||
volume = float(box[-3]) * float(box[-2]) * float(box[-1])
|
||||
|
||||
total_mass = 0
|
||||
for i in range(len(self.system.molecule)):
|
||||
total_mass += self.system.molecule[i].total_mass * self.step.dice.nmol[i]
|
||||
|
||||
density = (total_mass / volume) * UMAANG3_TO_GCM3
|
||||
|
||||
return density
|
||||
|
||||
def _make_nvt_ter(self, cycle, proc_dir):
|
||||
file = Path(proc_dir, "NVT.ter")
|
||||
with open(file, "w") as f:
|
||||
f.write(f"title = {self.title} - NVT Thermalization\n")
|
||||
f.write(f"ncores = {self.step.ncores}\n")
|
||||
f.write(f"ljname = {self.step.dice.ljname}\n")
|
||||
f.write(f"outname = {self.step.dice.outname}\n")
|
||||
|
||||
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
|
||||
f.write(f"nmol = {mol_string}\n")
|
||||
|
||||
f.write(f"dens = {self.step.dice.dens}\n")
|
||||
f.write(f"temp = {self.step.dice.temp}\n")
|
||||
|
||||
if self.step.dice.randominit == "first" and cycle > 1:
|
||||
f.write("init = yesreadxyz\n")
|
||||
f.write(f"nstep = {self.step.altsteps}\n")
|
||||
else:
|
||||
f.write("init = yes\n")
|
||||
f.write(f"nstep = {self.step.dice.nstep[0]}\n")
|
||||
|
||||
f.write("vstep = 0\n")
|
||||
f.write("mstop = 1\n")
|
||||
f.write("accum = no\n")
|
||||
f.write("iprint = 1\n")
|
||||
f.write("isave = 0\n")
|
||||
f.write("irdf = 0\n")
|
||||
|
||||
seed = int(1e6 * random.random())
|
||||
f.write(f"seed = {seed}\n")
|
||||
f.write(f"upbuf = {self.step.dice.upbuf}")
|
||||
|
||||
def _make_nvt_eq(self, cycle, proc_dir):
|
||||
file = Path(proc_dir, "NVT.eq")
|
||||
with open(file, "w") as f:
|
||||
f.write(f"title = {self.title} - NVT Production\n")
|
||||
f.write(f"ncores = {self.step.ncores}\n")
|
||||
f.write(f"ljname = {self.step.dice.ljname}\n")
|
||||
f.write(f"outname = {self.step.dice.outname}\n")
|
||||
|
||||
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
|
||||
f.write(f"nmol = {mol_string}\n")
|
||||
|
||||
f.write(f"dens = {self.step.dice.dens}\n")
|
||||
f.write(f"temp = {self.step.dice.temp}\n")
|
||||
|
||||
if self.step.dice.randominit == "first" and cycle > 1:
|
||||
f.write("init = yesreadxyz\n")
|
||||
else:
|
||||
f.write("init = no\n")
|
||||
|
||||
f.write(f"nstep = {self.step.dice.nstep[1]}\n")
|
||||
|
||||
f.write("vstep = 0\n")
|
||||
f.write("mstop = 1\n")
|
||||
f.write("accum = no\n")
|
||||
f.write("iprint = 1\n")
|
||||
|
||||
f.write(f"isave = {self.step.dice.isave}\n")
|
||||
f.write(f"irdf = {10 * self.step.nprocs}\n")
|
||||
|
||||
seed = int(1e6 * random.random())
|
||||
f.write("seed = {}\n".format(seed))
|
||||
|
||||
def _make_npt_ter(self, cycle, proc_dir):
|
||||
file = Path(proc_dir, "NPT.ter")
|
||||
with open(file, "w") as f:
|
||||
f.write(f"title = {self.title} - NPT Thermalization\n")
|
||||
f.write(f"ncores = {self.step.ncores}\n")
|
||||
f.write(f"ljname = {self.step.dice.ljname}\n")
|
||||
f.write(f"outname = {self.step.dice.outname}\n")
|
||||
|
||||
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
|
||||
f.write(f"nmol = {mol_string}\n")
|
||||
|
||||
f.write(f"press = {self.step.dice.press}\n")
|
||||
f.write(f"temp = {self.step.dice.temp}\n")
|
||||
|
||||
if self.step.dice.randominit == "first" and cycle > 1:
|
||||
f.write("init = yesreadxyz\n")
|
||||
f.write(f"dens = {self.step.dice.dens:<8.4f}\n")
|
||||
f.write(f"vstep = {int(self.step.altsteps / 5)}\n")
|
||||
else:
|
||||
f.write("init = no\n")
|
||||
f.write(f"vstep = {int(self.step.dice.nstep[1] / 5)}\n")
|
||||
|
||||
f.write("nstep = 5\n")
|
||||
f.write("mstop = 1\n")
|
||||
f.write("accum = no\n")
|
||||
f.write("iprint = 1\n")
|
||||
f.write("isave = 0\n")
|
||||
f.write("irdf = 0\n")
|
||||
|
||||
seed = int(1e6 * random.random())
|
||||
f.write(f"seed = {seed}\n")
|
||||
|
||||
def _make_npt_eq(self, proc_dir):
|
||||
file = Path(proc_dir, "NPT.eq")
|
||||
with open(file, "w") as f:
|
||||
f.write(f"title = {self.title} - NPT Production\n")
|
||||
f.write(f"ncores = {self.step.ncores}\n")
|
||||
f.write(f"ljname = {self.step.dice.ljname}\n")
|
||||
f.write(f"outname = {self.step.dice.outname}\n")
|
||||
|
||||
mol_string = " ".join(str(x) for x in self.step.dice.nmol)
|
||||
f.write(f"nmol = {mol_string}\n")
|
||||
|
||||
f.write(f"press = {self.step.dice.press}\n")
|
||||
f.write(f"temp = {self.step.dice.temp}\n")
|
||||
|
||||
f.write("nstep = 5\n")
|
||||
|
||||
f.write(f"vstep = {int(self.step.dice.nstep[2] / 5)}\n")
|
||||
f.write("init = no\n")
|
||||
f.write("mstop = 1\n")
|
||||
f.write("accum = no\n")
|
||||
f.write("iprint = 1\n")
|
||||
f.write(f"isave = {self.step.dice.isave}\n")
|
||||
f.write(f"irdf = {10 * self.step.nprocs}\n")
|
||||
|
||||
seed = int(1e6 * random.random())
|
||||
f.write(f"seed = {seed}\n")
|
||||
|
||||
def _make_potentials(self, proc_dir):
|
||||
fstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f}\n"
|
||||
|
||||
file = Path(proc_dir, self.step.dice.ljname)
|
||||
with open(file, "w") as f:
|
||||
f.write(f"{self.step.dice.combrule}\n")
|
||||
f.write(f"{len(self.step.dice.nmol)}\n")
|
||||
|
||||
nsites_qm = len(self.system.molecule[0].atom)
|
||||
f.write(f"{nsites_qm} {self.system.molecule[0].molname}\n")
|
||||
|
||||
for atom in self.system.molecule[0].atom:
|
||||
f.write(
|
||||
fstr.format(
|
||||
atom.lbl,
|
||||
atom.na,
|
||||
atom.rx,
|
||||
atom.ry,
|
||||
atom.rz,
|
||||
atom.chg,
|
||||
atom.eps,
|
||||
atom.sig,
|
||||
)
|
||||
)
|
||||
|
||||
for mol in self.system.molecule[1:]:
|
||||
f.write(f"{len(mol.atom)} {mol.molname}\n")
|
||||
for atom in mol.atom:
|
||||
f.write(
|
||||
fstr.format(
|
||||
atom.lbl,
|
||||
atom.na,
|
||||
atom.rx,
|
||||
atom.ry,
|
||||
atom.rz,
|
||||
atom.chg,
|
||||
atom.eps,
|
||||
atom.sig,
|
||||
)
|
||||
)
|
||||
|
||||
def run_dice_file(self, cycle: int, proc: int, file_name: str):
|
||||
with (
|
||||
open(Path(file_name), "r") as infile,
|
||||
open(Path(file_name + ".out"), "w") as outfile,
|
||||
):
|
||||
if shutil.which("bash") is not None:
|
||||
exit_status = subprocess.call(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
f"exec -a dice-step{cycle}-p{proc} {self.step.dice.progname} < {infile.name} > {outfile.name}",
|
||||
]
|
||||
)
|
||||
else:
|
||||
exit_status = subprocess.call(
|
||||
self.step.dice.progname, stdin=infile, stdout=outfile
|
||||
)
|
||||
|
||||
if exit_status != 0:
|
||||
raise RuntimeError(
|
||||
f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly"
|
||||
)
|
||||
|
||||
with open(Path(file_name + ".out"), "r") as outfile:
|
||||
flag = outfile.readlines()[DICE_FLAG_LINE].strip()
|
||||
if flag != DICE_END_FLAG:
|
||||
raise RuntimeError(
|
||||
f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly"
|
||||
)
|
||||
|
||||
logger.info(f"Dice {file_name} - step{cycle:02d}-p{proc:02d} exited properly")
|
||||
@@ -1,359 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from diceplayer import logger
|
||||
from diceplayer.config.player_config import PlayerConfig
|
||||
from diceplayer.environment import Atom
|
||||
from diceplayer.environment.molecule import Molecule
|
||||
from diceplayer.environment.system import System
|
||||
from diceplayer.interface import Interface
|
||||
from diceplayer.utils.misc import date_time
|
||||
from diceplayer.utils.ptable import PTable
|
||||
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
from typing_extensions import Any, Dict, List, Tuple
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class GaussianInterface(Interface):
|
||||
def configure(self, step_dto: PlayerConfig, system: System):
|
||||
self.system = system
|
||||
self.step = step_dto
|
||||
|
||||
def start(self, cycle: int) -> Dict[str, NDArray]:
|
||||
self._make_qm_dir(cycle)
|
||||
|
||||
if cycle > 1:
|
||||
self._copy_chk_file_from_previous_step(cycle)
|
||||
|
||||
asec_charges = self.populate_asec_vdw(cycle)
|
||||
self._make_gaussian_input_file(cycle, asec_charges)
|
||||
|
||||
self._run_gaussian(cycle)
|
||||
self._run_formchk(cycle)
|
||||
|
||||
return_value = {}
|
||||
if self.step.opt:
|
||||
# return_value['position'] = np.array(
|
||||
# self._run_optimization(cycle)
|
||||
# )
|
||||
raise NotImplementedError("Optimization not implemented yet.")
|
||||
|
||||
else:
|
||||
return_value["charges"] = np.array(self._read_charges_from_fchk(cycle))
|
||||
|
||||
return return_value
|
||||
|
||||
def reset(self):
|
||||
del self.step
|
||||
del self.system
|
||||
|
||||
def _make_qm_dir(self, cycle: int):
|
||||
qm_dir_path = Path(self.step.simulation_dir, f"step{cycle:02d}", "qm")
|
||||
if not qm_dir_path.exists():
|
||||
qm_dir_path.mkdir()
|
||||
|
||||
def _copy_chk_file_from_previous_step(self, cycle: int):
|
||||
current_chk_file_path = Path(
|
||||
self.step.simulation_dir, f"step{cycle:02d}", "qm", "asec.chk"
|
||||
)
|
||||
if current_chk_file_path.exists():
|
||||
raise FileExistsError(f"File {current_chk_file_path} already exists.")
|
||||
|
||||
previous_chk_file_path = Path(
|
||||
self.step.simulation_dir, f"step{(cycle - 1):02d}", "qm", "asec.chk"
|
||||
)
|
||||
if not previous_chk_file_path.exists():
|
||||
raise FileNotFoundError(f"File {previous_chk_file_path} does not exist.")
|
||||
|
||||
shutil.copy(previous_chk_file_path, current_chk_file_path)
|
||||
|
||||
def populate_asec_vdw(self, cycle: int) -> list[dict]:
|
||||
norm_factor = self._calculate_norm_factor()
|
||||
|
||||
nsitesref = len(self.system.molecule[0].atom)
|
||||
|
||||
nsites_total = self._calculate_total_number_of_sites(nsitesref)
|
||||
|
||||
proc_charges = []
|
||||
for proc in range(1, self.step.nprocs + 1):
|
||||
proc_charges.append(self._read_charges_from_last_step(cycle, proc))
|
||||
|
||||
asec_charges, thickness, picked_mols = self._evaluate_proc_charges(
|
||||
nsites_total, proc_charges
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"In average, {(sum(picked_mols) / norm_factor):^7.2f} molecules\n"
|
||||
f"were selected from each of the {len(picked_mols)} configurations\n"
|
||||
f"of the production simulations to form the ASEC, comprising a shell with\n"
|
||||
f"minimum thickness of {(sum(thickness) / norm_factor):>6.2f} Angstrom\n"
|
||||
)
|
||||
|
||||
for charge in asec_charges:
|
||||
charge["chg"] = charge["chg"] / norm_factor
|
||||
|
||||
return asec_charges
|
||||
|
||||
def _calculate_norm_factor(self) -> int:
|
||||
if self.step.dice.nstep[-1] % self.step.dice.isave == 0:
|
||||
nconfigs = round(self.step.dice.nstep[-1] / self.step.dice.isave)
|
||||
else:
|
||||
nconfigs = int(self.step.dice.nstep[-1] / self.step.dice.isave)
|
||||
|
||||
return nconfigs * self.step.nprocs
|
||||
|
||||
def _calculate_total_number_of_sites(self, nsitesref) -> int:
|
||||
nsites_total = self.step.dice.nmol[0] * nsitesref
|
||||
for i in range(1, len(self.step.dice.nmol)):
|
||||
nsites_total += self.step.dice.nmol[i] * len(self.system.molecule[i].atom)
|
||||
|
||||
return nsites_total
|
||||
|
||||
def _read_charges_from_last_step(self, cycle: int, proc: int) -> list[str]:
|
||||
last_xyz_file_path = Path(
|
||||
self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}", "last.xyz"
|
||||
)
|
||||
if not last_xyz_file_path.exists():
|
||||
raise FileNotFoundError(f"File {last_xyz_file_path} does not exist.")
|
||||
|
||||
with open(last_xyz_file_path, "r") as last_xyz_file:
|
||||
lines = last_xyz_file.readlines()
|
||||
|
||||
return lines
|
||||
|
||||
def _evaluate_proc_charges(
|
||||
self, total_nsites: int, proc_charges: list[list[str]]
|
||||
) -> Tuple[List[Dict[str, float | Any]], List[float], List[int]]:
|
||||
asec_charges = []
|
||||
|
||||
thickness = []
|
||||
picked_mols = []
|
||||
|
||||
for charges in proc_charges:
|
||||
charges_nsites = int(charges.pop(0))
|
||||
if int(charges_nsites) != total_nsites:
|
||||
raise ValueError(
|
||||
"Number of sites does not match total number of sites."
|
||||
)
|
||||
|
||||
thickness.append(self._calculate_proc_thickness(charges))
|
||||
nsites_ref_mol = len(self.system.molecule[0].atom)
|
||||
charges = charges[nsites_ref_mol:]
|
||||
|
||||
mol_count = 0
|
||||
for type in range(len(self.step.dice.nmol)):
|
||||
if type == 0:
|
||||
# Reference Molecule must be ignored from type 0
|
||||
nmols = self.step.dice.nmol[type] - 1
|
||||
else:
|
||||
nmols = self.step.dice.nmol[type]
|
||||
|
||||
for mol in range(nmols):
|
||||
new_molecule = Molecule("ASEC TMP MOLECULE")
|
||||
for site in range(len(self.system.molecule[type].atom)):
|
||||
line = charges.pop(0).split()
|
||||
|
||||
if (
|
||||
line[0].title()
|
||||
!= PTable.get_atomic_symbol(
|
||||
self.system.molecule[type].atom[site].na
|
||||
).strip()
|
||||
):
|
||||
raise SyntaxError(
|
||||
"Error: Invalid Dice Output. Atom type does not match."
|
||||
)
|
||||
|
||||
new_molecule.add_atom(
|
||||
Atom(
|
||||
self.system.molecule[type].atom[site].lbl,
|
||||
self.system.molecule[type].atom[site].na,
|
||||
float(line[1]),
|
||||
float(line[2]),
|
||||
float(line[3]),
|
||||
self.system.molecule[type].atom[site].chg,
|
||||
self.system.molecule[type].atom[site].eps,
|
||||
self.system.molecule[type].atom[site].sig,
|
||||
)
|
||||
)
|
||||
|
||||
distance = self.system.molecule[0].minimum_distance(new_molecule)
|
||||
|
||||
if distance < thickness[-1]:
|
||||
for atom in new_molecule.atom:
|
||||
asec_charges.append(
|
||||
{
|
||||
"lbl": PTable.get_atomic_symbol(atom.na),
|
||||
"rx": atom.rx,
|
||||
"ry": atom.ry,
|
||||
"rz": atom.rz,
|
||||
"chg": atom.chg,
|
||||
}
|
||||
)
|
||||
mol_count += 1
|
||||
|
||||
picked_mols.append(mol_count)
|
||||
|
||||
return asec_charges, thickness, picked_mols
|
||||
|
||||
def _calculate_proc_thickness(self, charges: list[str]) -> float:
|
||||
box = charges.pop(0).split()[-3:]
|
||||
box = [float(box[0]), float(box[1]), float(box[2])]
|
||||
sizes = self.system.molecule[0].sizes_of_molecule()
|
||||
|
||||
return min(
|
||||
[
|
||||
(box[0] - sizes[0]) / 2,
|
||||
(box[1] - sizes[1]) / 2,
|
||||
(box[2] - sizes[2]) / 2,
|
||||
]
|
||||
)
|
||||
|
||||
def _make_gaussian_input_file(self, cycle: int, asec_charges: list[dict]) -> None:
|
||||
gaussian_input_file_path = Path(
|
||||
self.step.simulation_dir, f"step{cycle:02d}", "qm", "asec.gjf"
|
||||
)
|
||||
|
||||
with open(gaussian_input_file_path, "w") as gaussian_input_file:
|
||||
gaussian_input_file.writelines(
|
||||
self._generate_gaussian_input(cycle, asec_charges)
|
||||
)
|
||||
|
||||
def _generate_gaussian_input(
|
||||
self, cycle: int, asec_charges: list[dict]
|
||||
) -> list[str]:
|
||||
gaussian_input = ["%Chk=asec.chk\n"]
|
||||
|
||||
if self.step.mem is not None:
|
||||
gaussian_input.append(f"%Mem={self.step.mem}GB\n")
|
||||
|
||||
gaussian_input.append(f"%Nprocs={self.step.nprocs * self.step.ncores}\n")
|
||||
|
||||
kwords_line = f"#P {self.step.gaussian.level}"
|
||||
|
||||
if self.step.gaussian.keywords:
|
||||
kwords_line += " " + self.step.gaussian.keywords
|
||||
|
||||
if self.step.opt == "yes":
|
||||
kwords_line += " Force"
|
||||
|
||||
kwords_line += " NoSymm"
|
||||
kwords_line += f" Pop={self.step.gaussian.pop} Density=Current"
|
||||
|
||||
if cycle > 1:
|
||||
kwords_line += " Guess=Read"
|
||||
|
||||
gaussian_input.append(textwrap.fill(kwords_line, 90))
|
||||
gaussian_input.append("\n")
|
||||
|
||||
gaussian_input.append("\nForce calculation - Cycle number {}\n".format(cycle))
|
||||
gaussian_input.append("\n")
|
||||
gaussian_input.append(
|
||||
f"{self.step.gaussian.chgmult[0]},{self.step.gaussian.chgmult[1]}\n"
|
||||
)
|
||||
|
||||
for atom in self.system.molecule[0].atom:
|
||||
symbol = PTable.get_atomic_symbol(atom.na)
|
||||
gaussian_input.append(
|
||||
"{:<2s} {:>10.5f} {:>10.5f} {:>10.5f}\n".format(
|
||||
symbol, atom.rx, atom.ry, atom.rz
|
||||
)
|
||||
)
|
||||
|
||||
gaussian_input.append("\n")
|
||||
|
||||
for charge in asec_charges:
|
||||
gaussian_input.append(
|
||||
"{:>10.5f} {:>10.5f} {:>10.5f} {:>11.8f}\n".format(
|
||||
charge["rx"], charge["ry"], charge["rz"], charge["chg"]
|
||||
)
|
||||
)
|
||||
|
||||
gaussian_input.append("\n")
|
||||
|
||||
return gaussian_input
|
||||
|
||||
def _run_gaussian(self, cycle: int) -> None:
|
||||
qm_dir = Path(self.step.simulation_dir, f"step{(cycle):02d}", "qm")
|
||||
|
||||
working_dir = os.getcwd()
|
||||
os.chdir(qm_dir)
|
||||
|
||||
infile = "asec.gjf"
|
||||
|
||||
operation = None
|
||||
if self.step.opt:
|
||||
operation = "forces"
|
||||
else:
|
||||
operation = "charges"
|
||||
|
||||
logger.info(
|
||||
f"Calculation of {operation} initiated with Gaussian on {date_time()}\n"
|
||||
)
|
||||
|
||||
if shutil.which("bash") is not None:
|
||||
exit_status = subprocess.call(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
"exec -a {}-step{} {} {}".format(
|
||||
self.step.gaussian.qmprog,
|
||||
cycle,
|
||||
self.step.gaussian.qmprog,
|
||||
infile,
|
||||
),
|
||||
]
|
||||
)
|
||||
else:
|
||||
exit_status = subprocess.call([self.step.gaussian.qmprog, infile])
|
||||
|
||||
if exit_status != 0:
|
||||
raise SystemError("Gaussian process did not exit properly")
|
||||
|
||||
logger.info(f"Calculation of {operation} finished on {date_time()}")
|
||||
|
||||
os.chdir(working_dir)
|
||||
|
||||
def _run_formchk(self, cycle: int):
|
||||
qm_dir = Path(self.step.simulation_dir, f"step{(cycle):02d}", "qm")
|
||||
|
||||
work_dir = os.getcwd()
|
||||
os.chdir(qm_dir)
|
||||
|
||||
logger.info("Formatting the checkpoint file... \n")
|
||||
|
||||
exit_status = subprocess.call(
|
||||
["formchk", "asec.chk"], stdout=subprocess.DEVNULL
|
||||
)
|
||||
|
||||
if exit_status != 0:
|
||||
raise SystemError("Formchk process did not exit properly")
|
||||
|
||||
logger.info("Done\n")
|
||||
|
||||
os.chdir(work_dir)
|
||||
|
||||
def _read_charges_from_fchk(self, cycle: int):
|
||||
fchk_file_path = Path("simfiles", f"step{cycle:02d}", "qm", "asec.fchk")
|
||||
with open(fchk_file_path) as fchk:
|
||||
fchkfile = fchk.readlines()
|
||||
|
||||
if self.step.gaussian.pop in ["chelpg", "mk"]:
|
||||
CHARGE_FLAG = "ESP Charges"
|
||||
else:
|
||||
CHARGE_FLAG = "ESP Charges"
|
||||
|
||||
start = fchkfile.pop(0).strip()
|
||||
while start.find(CHARGE_FLAG) != 0: # expression in begining of line
|
||||
start = fchkfile.pop(0).strip()
|
||||
|
||||
charges: List[float] = []
|
||||
while len(charges) < len(self.system.molecule[0].atom):
|
||||
charges.extend([float(x) for x in fchkfile.pop(0).split()])
|
||||
|
||||
return charges
|
||||
4
diceplayer/logger.py
Normal file
4
diceplayer/logger.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from diceplayer import RunLogger
|
||||
|
||||
|
||||
logger = RunLogger("diceplayer")
|
||||
@@ -1,465 +1,9 @@
|
||||
from diceplayer import VERSION, logger
|
||||
from diceplayer.config.player_config import PlayerConfig
|
||||
from diceplayer.environment import Atom, Molecule, System
|
||||
from diceplayer.interface import DiceInterface, GaussianInterface
|
||||
from diceplayer.utils import PTable, weekday_date_time
|
||||
|
||||
import yaml
|
||||
from pydantic import BaseModel
|
||||
from typing_extensions import Tuple
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ENV = ["OMP_STACKSIZE"]
|
||||
|
||||
|
||||
class Player:
|
||||
def __init__(self, infile: str = None, optimization: bool = False):
|
||||
if infile is None and optimization is False:
|
||||
raise ValueError("Must specify either infile or optimization")
|
||||
def __init__(self, config: PlayerConfig):
|
||||
self.config = config
|
||||
|
||||
elif infile is not None:
|
||||
self.config = self.set_config(self.read_keywords(infile))
|
||||
|
||||
self.system = System()
|
||||
|
||||
self.initial_cycle = 1
|
||||
|
||||
elif optimization is True:
|
||||
save = self.load_run_from_pickle()
|
||||
|
||||
self.config = save[0]
|
||||
|
||||
self.system = save[1]
|
||||
|
||||
self.initial_cycle = save[2] + 1
|
||||
|
||||
else:
|
||||
raise ValueError("Must specify either infile or config")
|
||||
|
||||
self.dice_interface = DiceInterface()
|
||||
self.gaussian_interface = GaussianInterface()
|
||||
|
||||
def start(self):
|
||||
logger.info(
|
||||
"==========================================================================================\n"
|
||||
"Starting the iterative process.\n"
|
||||
"==========================================================================================\n"
|
||||
)
|
||||
|
||||
for cycle in range(self.initial_cycle, self.initial_cycle + self.config.maxcyc):
|
||||
logger.info(
|
||||
f"------------------------------------------------------------------------------------------\n"
|
||||
f" Step # {cycle}\n"
|
||||
f"------------------------------------------------------------------------------------------\n"
|
||||
)
|
||||
|
||||
self.dice_start(cycle)
|
||||
|
||||
try:
|
||||
self.gaussian_start(cycle)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
self.save_run_in_pickle(cycle)
|
||||
|
||||
def prepare_system(self):
|
||||
for i, mol in enumerate(self.system.molecule):
|
||||
logger.info(f"Molecule {i + 1} - {mol.molname}")
|
||||
|
||||
mol.print_mol_info()
|
||||
logger.info(
|
||||
"\n Translating and rotating molecule to standard orientation..."
|
||||
)
|
||||
|
||||
mol.rotate_to_standard_orientation()
|
||||
logger.info("\n Done")
|
||||
logger.info("\nNew values:\n")
|
||||
mol.print_mol_info()
|
||||
|
||||
logger.info("\n")
|
||||
|
||||
def create_simulation_dir(self):
|
||||
simulation_dir_path = Path(self.config.simulation_dir)
|
||||
if simulation_dir_path.exists():
|
||||
raise FileExistsError(
|
||||
f"Error: a file or a directory {self.config.simulation_dir} already exists,"
|
||||
f" move or delete the simfiles directory to continue."
|
||||
)
|
||||
simulation_dir_path.mkdir()
|
||||
|
||||
def create_geoms_file(self):
|
||||
geoms_file_path = Path(self.config.geoms_file)
|
||||
if geoms_file_path.exists():
|
||||
raise FileExistsError(
|
||||
f"Error: a file or a directory {self.config.geoms_file} already exists,"
|
||||
f" move or delete the simfiles directory to continue."
|
||||
)
|
||||
geoms_file_path.touch()
|
||||
|
||||
def print_keywords(self) -> None:
|
||||
def log_keywords(config: BaseModel):
|
||||
for key, value in sorted(config.model_dump().items()):
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, list):
|
||||
string = " ".join(str(x) for x in value)
|
||||
logger.info(f"{key} = [ {string} ]")
|
||||
else:
|
||||
logger.info(f"{key} = {value}")
|
||||
|
||||
logger.info(
|
||||
f"##########################################################################################\n"
|
||||
f"############# Welcome to DICEPLAYER version {VERSION} #############\n"
|
||||
f"##########################################################################################\n"
|
||||
)
|
||||
logger.info("Your python version is {}\n".format(sys.version))
|
||||
logger.info("Program started on {}\n".format(weekday_date_time()))
|
||||
logger.info("Environment variables:")
|
||||
for var in ENV:
|
||||
logger.info(
|
||||
"{} = {}\n".format(
|
||||
var, (os.environ[var] if var in os.environ else "Not set")
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"------------------------------------------------------------------------------------------\n"
|
||||
" DICE variables being used in this run:\n"
|
||||
"------------------------------------------------------------------------------------------\n"
|
||||
)
|
||||
|
||||
log_keywords(self.config.dice)
|
||||
|
||||
logger.info(
|
||||
"------------------------------------------------------------------------------------------\n"
|
||||
" GAUSSIAN variables being used in this run:\n"
|
||||
"------------------------------------------------------------------------------------------\n"
|
||||
)
|
||||
|
||||
log_keywords(self.config.gaussian)
|
||||
|
||||
logger.info("\n")
|
||||
|
||||
def read_potentials(self):
|
||||
ljname_path = Path(self.config.dice.ljname)
|
||||
if ljname_path.exists():
|
||||
with open(self.config.dice.ljname) as file:
|
||||
ljc_data = file.readlines()
|
||||
else:
|
||||
raise RuntimeError(f"Potential file {self.config.dice.ljname} not found.")
|
||||
|
||||
combrule = ljc_data.pop(0).split()[0]
|
||||
if combrule not in ("*", "+"):
|
||||
sys.exit(
|
||||
"Error: expected a '*' or a '+' sign in 1st line of file {}".format(
|
||||
self.config.dice.ljname
|
||||
)
|
||||
)
|
||||
self.config.dice.combrule = combrule
|
||||
|
||||
ntypes = ljc_data.pop(0).split()[0]
|
||||
if not ntypes.isdigit():
|
||||
sys.exit(
|
||||
"Error: expected an integer in the 2nd line of file {}".format(
|
||||
self.config.dice.ljname
|
||||
)
|
||||
)
|
||||
ntypes = int(ntypes)
|
||||
|
||||
if ntypes != len(self.config.dice.nmol):
|
||||
sys.exit(
|
||||
f"Error: number of molecule types in file {self.config.dice.ljname} "
|
||||
f"must match that of 'nmol' keyword in config file"
|
||||
)
|
||||
|
||||
for i in range(ntypes):
|
||||
try:
|
||||
nsites, molname = ljc_data.pop(0).split()[:2]
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
f"Error: expected nsites and molname for the molecule type {i + 1}"
|
||||
)
|
||||
|
||||
if not nsites.isdigit():
|
||||
raise ValueError(
|
||||
f"Error: expected nsites to be an integer for molecule type {i + 1}"
|
||||
)
|
||||
|
||||
nsites = int(nsites)
|
||||
self.system.add_type(Molecule(molname))
|
||||
|
||||
atom_fields = ["lbl", "na", "rx", "ry", "rz", "chg", "eps", "sig"]
|
||||
for j in range(nsites):
|
||||
new_atom = dict(zip(atom_fields, ljc_data.pop(0).split()))
|
||||
self.system.molecule[i].add_atom(
|
||||
Atom(**self.validate_atom_dict(i, j, new_atom))
|
||||
)
|
||||
|
||||
def print_potentials(self) -> None:
|
||||
formatstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f} {:>9.4f}"
|
||||
logger.info(
|
||||
"==========================================================================================\n"
|
||||
f" Potential parameters from file {self.config.dice.ljname}:\n"
|
||||
"------------------------------------------------------------------------------------------"
|
||||
"\n"
|
||||
)
|
||||
|
||||
logger.info(f"Combination rule: {self.config.dice.combrule}")
|
||||
logger.info(f"Types of molecules: {len(self.system.molecule)}\n")
|
||||
|
||||
i = 0
|
||||
for mol in self.system.molecule:
|
||||
i += 1
|
||||
logger.info("{} atoms in molecule type {}:".format(len(mol.atom), i))
|
||||
logger.info(
|
||||
"---------------------------------------------------------------------------------"
|
||||
)
|
||||
logger.info(
|
||||
"Lbl AN X Y Z Charge Epsilon Sigma Mass"
|
||||
)
|
||||
logger.info(
|
||||
"---------------------------------------------------------------------------------"
|
||||
)
|
||||
|
||||
for atom in mol.atom:
|
||||
logger.info(
|
||||
formatstr.format(
|
||||
atom.lbl,
|
||||
atom.na,
|
||||
atom.rx,
|
||||
atom.ry,
|
||||
atom.rz,
|
||||
atom.chg,
|
||||
atom.eps,
|
||||
atom.sig,
|
||||
atom.mass,
|
||||
)
|
||||
)
|
||||
|
||||
logger.info("\n")
|
||||
|
||||
def dice_start(self, cycle: int):
|
||||
self.dice_interface.configure(
|
||||
self.config,
|
||||
self.system,
|
||||
)
|
||||
|
||||
self.dice_interface.start(cycle)
|
||||
|
||||
self.dice_interface.reset()
|
||||
|
||||
def gaussian_start(self, cycle: int):
|
||||
self.gaussian_interface.configure(
|
||||
self.config,
|
||||
self.system,
|
||||
)
|
||||
|
||||
result = self.gaussian_interface.start(cycle)
|
||||
|
||||
self.gaussian_interface.reset()
|
||||
|
||||
if self.config.opt:
|
||||
if "position" not in result:
|
||||
raise RuntimeError("Optimization failed. No position found in result.")
|
||||
|
||||
else:
|
||||
if "charges" not in result:
|
||||
raise RuntimeError(
|
||||
"Charges optimization failed. No charges found in result."
|
||||
)
|
||||
|
||||
diff = self.system.molecule[0].update_charges(result["charges"])
|
||||
|
||||
self.print_charges_and_dipole(cycle)
|
||||
self.print_geoms(cycle)
|
||||
|
||||
if diff < self.config.gaussian.chg_tol:
|
||||
logger.info(f"Charges converged after {cycle} cycles.")
|
||||
raise StopIteration()
|
||||
|
||||
def print_charges_and_dipole(self, cycle: int) -> None:
|
||||
"""
|
||||
Print the charges and dipole of the molecule in the Output file
|
||||
|
||||
Args:
|
||||
cycle (int): Number of the cycle
|
||||
fh (TextIO): Output file
|
||||
"""
|
||||
|
||||
logger.info("Cycle # {}\n".format(cycle))
|
||||
logger.info("Number of site: {}\n".format(len(self.system.molecule[0].atom)))
|
||||
|
||||
chargesAndDipole = self.system.molecule[0].charges_and_dipole()
|
||||
|
||||
logger.info(
|
||||
"{:>10.6f} {:>10.6f} {:>10.6f} {:>10.6f} {:>10.6f}\n".format(
|
||||
chargesAndDipole[0],
|
||||
chargesAndDipole[1],
|
||||
chargesAndDipole[2],
|
||||
chargesAndDipole[3],
|
||||
chargesAndDipole[4],
|
||||
)
|
||||
)
|
||||
|
||||
def print_geoms(self, cycle: int):
|
||||
with open(self.config.geoms_file, "a") as file:
|
||||
file.write(f"Cycle # {cycle}\n")
|
||||
|
||||
for atom in self.system.molecule[0].atom:
|
||||
symbol = PTable.get_atomic_symbol(atom.na)
|
||||
file.write(
|
||||
f"{symbol:<2s} {atom.rx:>10.6f} {atom.ry:>10.6f} {atom.rz:>10.6f}\n"
|
||||
)
|
||||
|
||||
file.write("\n")
|
||||
|
||||
@staticmethod
|
||||
def validate_atom_dict(molecule_type, molecule_site, atom_dict: dict) -> dict:
|
||||
molecule_type += 1
|
||||
molecule_site += 1
|
||||
|
||||
if len(atom_dict) < 8:
|
||||
raise ValueError(
|
||||
f"Invalid number of fields for site {molecule_site} for molecule type {molecule_type}."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["lbl"] = int(atom_dict["lbl"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid lbl fields for site {molecule_site} for molecule type {molecule_type}."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["na"] = int(atom_dict["na"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid na fields for site {molecule_site} for molecule type {molecule_type}."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["rx"] = float(atom_dict["rx"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid rx fields for site {molecule_site} for molecule type {molecule_type}. "
|
||||
f"Value must be a float."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["ry"] = float(atom_dict["ry"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid ry fields for site {molecule_site} for molecule type {molecule_type}. "
|
||||
f"Value must be a float."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["rz"] = float(atom_dict["rz"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid rz fields for site {molecule_site} for molecule type {molecule_type}. "
|
||||
f"Value must be a float."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["chg"] = float(atom_dict["chg"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid chg fields for site {molecule_site} for molecule type {molecule_type}. "
|
||||
f"Value must be a float."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["eps"] = float(atom_dict["eps"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid eps fields for site {molecule_site} for molecule type {molecule_type}. "
|
||||
f"Value must be a float."
|
||||
)
|
||||
|
||||
try:
|
||||
atom_dict["sig"] = float(atom_dict["sig"])
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
f"Invalid sig fields for site {molecule_site} for molecule type {molecule_type}. "
|
||||
f"Value must be a float."
|
||||
)
|
||||
|
||||
return atom_dict
|
||||
|
||||
def print_results(self):
|
||||
formatstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f} {:>9.4f}"
|
||||
|
||||
mol = self.system.molecule[0]
|
||||
logger.info("{} atoms in molecule type {}:".format(len(mol.atom), 1))
|
||||
logger.info(
|
||||
"---------------------------------------------------------------------------------"
|
||||
)
|
||||
logger.info(
|
||||
"Lbl AN X Y Z Charge Epsilon Sigma Mass"
|
||||
)
|
||||
logger.info(
|
||||
"---------------------------------------------------------------------------------"
|
||||
)
|
||||
|
||||
for atom in mol.atom:
|
||||
logger.info(
|
||||
formatstr.format(
|
||||
atom.lbl,
|
||||
atom.na,
|
||||
atom.rx,
|
||||
atom.ry,
|
||||
atom.rz,
|
||||
atom.chg,
|
||||
atom.eps,
|
||||
atom.sig,
|
||||
atom.mass,
|
||||
)
|
||||
)
|
||||
|
||||
logger.info("\n")
|
||||
|
||||
def save_run_in_pickle(self, cycle):
|
||||
try:
|
||||
with open("latest-step.pkl", "wb") as pickle_file:
|
||||
pickle.dump((self.config, self.system, cycle), pickle_file)
|
||||
except Exception:
|
||||
raise RuntimeError("Could not save pickle file latest-step.pkl.")
|
||||
|
||||
@staticmethod
|
||||
def load_run_from_pickle() -> Tuple[PlayerConfig, System, int]:
|
||||
pickle_path = Path("latest-step.pkl")
|
||||
try:
|
||||
with open(pickle_path, "rb") as pickle_file:
|
||||
save = pickle.load(pickle_file)
|
||||
return save[0], save[1], save[2] + 1
|
||||
|
||||
except Exception:
|
||||
raise RuntimeError(f"Could not load pickle file {pickle_path}.")
|
||||
|
||||
@staticmethod
|
||||
def set_config(data: dict) -> PlayerConfig:
|
||||
return PlayerConfig.model_validate(data)
|
||||
|
||||
@staticmethod
|
||||
def read_keywords(infile) -> dict:
|
||||
with open(infile, "r") as yml_file:
|
||||
config = yaml.load(yml_file, Loader=yaml.SafeLoader)
|
||||
|
||||
if "diceplayer" in config:
|
||||
return config.get("diceplayer")
|
||||
|
||||
raise RuntimeError(f"Could not find diceplayer section in {infile}.")
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, infile: str) -> "Player":
|
||||
return cls(infile=infile)
|
||||
|
||||
@classmethod
|
||||
def from_save(cls):
|
||||
return cls(optimization=True)
|
||||
def play(self, continuation = False):
|
||||
...
|
||||
@@ -1,4 +1,4 @@
|
||||
from .logger import Logger, valid_logger
|
||||
from .logger import RunLogger
|
||||
from .misc import (
|
||||
compress_files_1mb,
|
||||
date_time,
|
||||
@@ -10,8 +10,7 @@ from .ptable import AtomInfo, PTable
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Logger",
|
||||
"valid_logger",
|
||||
"RunLogger",
|
||||
"PTable",
|
||||
"AtomInfo",
|
||||
"weekday_date_time",
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
from typing_extensions import Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class Dataclass(Protocol):
|
||||
__dataclass_fields__: dict
|
||||
@@ -1,72 +1,48 @@
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing_extensions import TypeVar
|
||||
|
||||
H = TypeVar('H', bound=logging.Handler)
|
||||
|
||||
|
||||
def valid_logger(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
logger = args[0]
|
||||
assert logger._was_set, "Logger is not set. Please call set_logger() first."
|
||||
class RunLogger(logging.Logger):
|
||||
def __init__(self, name, level=logging.INFO, stream=sys.stdout):
|
||||
super().__init__(name, level)
|
||||
|
||||
return func(*args, **kwargs)
|
||||
self.handlers.clear()
|
||||
|
||||
return wrapper
|
||||
self.handlers.append(
|
||||
self._configure_handler(logging.StreamHandler(stream), level)
|
||||
)
|
||||
|
||||
|
||||
class Logger:
|
||||
outfile = None
|
||||
def set_output_file(self, outfile: Path, level=logging.INFO):
|
||||
for handler in list(self.handlers):
|
||||
if not isinstance(handler, logging.FileHandler):
|
||||
continue
|
||||
self.handlers.remove(handler)
|
||||
|
||||
_logger = None
|
||||
self.handlers.append(
|
||||
self._create_file_handler(outfile, level)
|
||||
)
|
||||
|
||||
_was_set = False
|
||||
|
||||
def __init__(self, logger_name):
|
||||
if self._logger is None:
|
||||
self._logger = logging.getLogger(logger_name)
|
||||
|
||||
def set_logger(self, outfile="run.log", level=logging.INFO, stream=None):
|
||||
outfile_path = None
|
||||
if outfile is not None and stream is None:
|
||||
outfile_path = Path(outfile)
|
||||
if outfile_path.exists():
|
||||
outfile_path.rename(str(outfile_path) + ".backup")
|
||||
@staticmethod
|
||||
def _create_file_handler(file: str|Path, level) -> logging.FileHandler:
|
||||
file = Path(file)
|
||||
|
||||
if level is not None:
|
||||
self._logger.setLevel(level)
|
||||
if file.exists():
|
||||
file.rename(file.with_suffix('.log.backup'))
|
||||
|
||||
self._create_handlers(outfile_path, stream)
|
||||
handler = logging.FileHandler(file)
|
||||
return RunLogger._configure_handler(handler, level)
|
||||
|
||||
self._was_set = True
|
||||
|
||||
@valid_logger
|
||||
def info(self, message):
|
||||
self._logger.info(message)
|
||||
|
||||
@valid_logger
|
||||
def debug(self, message):
|
||||
self._logger.debug(message)
|
||||
|
||||
@valid_logger
|
||||
def warning(self, message):
|
||||
self._logger.warning(message)
|
||||
|
||||
@valid_logger
|
||||
def error(self, message):
|
||||
self._logger.error(message)
|
||||
|
||||
def _create_handlers(self, outfile_path: Path, stream):
|
||||
handlers = []
|
||||
if outfile_path is not None:
|
||||
handlers.append(logging.FileHandler(outfile_path, mode="a+"))
|
||||
elif stream is not None:
|
||||
handlers.append(logging.StreamHandler(stream))
|
||||
else:
|
||||
handlers.append(logging.StreamHandler())
|
||||
|
||||
for handler in handlers:
|
||||
handler.setFormatter(logging.Formatter("%(message)s"))
|
||||
self._logger.addHandler(handler)
|
||||
|
||||
def close(self):
|
||||
for handler in self._logger.handlers:
|
||||
handler.close()
|
||||
self._logger.removeHandler(handler)
|
||||
@staticmethod
|
||||
def _configure_handler(handler: H, level) -> H:
|
||||
handler.setLevel(level)
|
||||
formatter = logging.Formatter('%(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
return handler
|
||||
Reference in New Issue
Block a user