Adds Formatter to Project

This commit is contained in:
2023-06-11 16:04:25 -03:00
parent 82f3092f3e
commit c4dae5e8d1
29 changed files with 1151 additions and 721 deletions

View File

@@ -7,10 +7,7 @@ from abc import ABC, abstractmethod
class Interface(ABC):
__slots__ = [
'step',
'system'
]
__slots__ = ["step", "system"]
def __init__(self):
self.system: System | None = None

View File

@@ -1,20 +1,21 @@
from __future__ import annotations
from diceplayer import logger
from diceplayer.shared.config.player_config import PlayerConfig
from diceplayer.shared.environment.system import System
from diceplayer.shared.interface import Interface
from diceplayer import logger
from multiprocessing import Process, connection
from setproctitle import setproctitle
from typing import Final, TextIO
from pathlib import Path
import subprocess
import shutil
import random
import time
import sys
import os
import random
import shutil
import subprocess
import sys
import time
from multiprocessing import Process, connection
from pathlib import Path
from typing import Final, TextIO
DICE_END_FLAG: Final[str] = "End of simulation"
DICE_FLAG_LINE: Final[int] = -2
@@ -74,19 +75,11 @@ class DiceInterface(Interface):
if not simulation_dir.exists():
simulation_dir.mkdir(parents=True)
proc_dir = Path(
simulation_dir,
f"step{cycle:02d}",
f"p{proc:02d}"
)
proc_dir = Path(simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
proc_dir.mkdir(parents=True, exist_ok=True)
def _make_dice_inputs(self, cycle, proc):
proc_dir = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
f"p{proc:02d}"
)
proc_dir = Path(self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
self._make_potentials(proc_dir)
@@ -94,17 +87,17 @@ class DiceInterface(Interface):
# This is logic is used to make the initial configuration file
# for the next cycle using the last.xyz file from the previous cycle.
if self.step.dice.randominit == 'first' and cycle > 1:
if self.step.dice.randominit == "first" and cycle > 1:
last_xyz = Path(
self.step.simulation_dir,
f"step{(cycle - 1):02d}",
f"p{proc:02d}",
"last.xyz"
"last.xyz",
)
if not last_xyz.exists():
raise FileNotFoundError(f"File {last_xyz} not found.")
with open(last_xyz, 'r') as last_xyz_file:
with open(last_xyz, "r") as last_xyz_file:
self._make_init_file(proc_dir, last_xyz_file)
last_xyz_file.seek(0)
self.step.dice.dens = self._new_density(last_xyz_file)
@@ -122,11 +115,7 @@ class DiceInterface(Interface):
def _run_dice(self, cycle: int, proc: int):
working_dir = os.getcwd()
proc_dir = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
f"p{proc:02d}"
)
proc_dir = Path(self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}")
logger.info(
f"Simulation process {str(proc_dir)} initiated with pid {os.getpid()}"
@@ -134,7 +123,7 @@ class DiceInterface(Interface):
os.chdir(proc_dir)
if not (self.step.dice.randominit == 'first' and cycle > 1):
if not (self.step.dice.randominit == "first" and cycle > 1):
self.run_dice_file(cycle, proc, "NVT.ter")
if len(self.step.dice.nstep) == 2:
@@ -168,17 +157,16 @@ class DiceInterface(Interface):
SECONDARY_MOLECULE_LENGTH = 0
for i in range(1, len(self.step.dice.nmol)):
SECONDARY_MOLECULE_LENGTH += self.step.dice.nmol[i] * len(self.system.molecule[i].atom)
SECONDARY_MOLECULE_LENGTH += self.step.dice.nmol[i] * len(
self.system.molecule[i].atom
)
xyz_lines = xyz_lines[-SECONDARY_MOLECULE_LENGTH:]
input_file = Path(proc_dir, self.step.dice.outname + ".xy")
with open(input_file, 'w') as f:
with open(input_file, "w") as f:
for atom in self.system.molecule[0].atom:
f.write(
f"{atom.rx:>10.6f} {atom.ry:>10.6f} {atom.rz:>10.6f}\n"
)
f.write(f"{atom.rx:>10.6f} {atom.ry:>10.6f} {atom.rz:>10.6f}\n")
for line in xyz_lines:
atom = line.split()
@@ -205,7 +193,7 @@ class DiceInterface(Interface):
def _make_nvt_ter(self, cycle, proc_dir):
file = Path(proc_dir, "NVT.ter")
with open(file, 'w') as f:
with open(file, "w") as f:
f.write(f"title = {self.title} - NVT Thermalization\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
@@ -236,9 +224,8 @@ class DiceInterface(Interface):
f.write(f"upbuf = {self.step.dice.upbuf}")
def _make_nvt_eq(self, cycle, proc_dir):
file = Path(proc_dir, "NVT.eq")
with open(file, 'w') as f:
with open(file, "w") as f:
f.write(f"title = {self.title} - NVT Production\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
@@ -269,9 +256,8 @@ class DiceInterface(Interface):
f.write("seed = {}\n".format(seed))
def _make_npt_ter(self, cycle, proc_dir):
file = Path(proc_dir, "NPT.ter")
with open(file, 'w') as f:
with open(file, "w") as f:
f.write(f"title = {self.title} - NPT Thermalization\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
@@ -303,7 +289,7 @@ class DiceInterface(Interface):
def _make_npt_eq(self, proc_dir):
file = Path(proc_dir, "NPT.eq")
with open(file, 'w') as f:
with open(file, "w") as f:
f.write(f"title = {self.title} - NPT Production\n")
f.write(f"ncores = {self.step.ncores}\n")
f.write(f"ljname = {self.step.dice.ljname}\n")
@@ -332,7 +318,7 @@ class DiceInterface(Interface):
fstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f}\n"
file = Path(proc_dir, self.step.dice.ljname)
with open(file, 'w') as f:
with open(file, "w") as f:
f.write(f"{self.step.dice.combrule}\n")
f.write(f"{len(self.step.dice.nmol)}\n")
@@ -370,7 +356,9 @@ class DiceInterface(Interface):
)
def run_dice_file(self, cycle: int, proc: int, file_name: str):
with open(Path(file_name), 'r') as infile, open(Path(file_name + ".out"), 'w') as outfile:
with open(Path(file_name), "r") as infile, open(
Path(file_name + ".out"), "w"
) as outfile:
if shutil.which("bash") is not None:
exit_status = subprocess.call(
[
@@ -385,11 +373,15 @@ class DiceInterface(Interface):
)
if exit_status != 0:
raise RuntimeError(f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly")
raise RuntimeError(
f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly"
)
with open(Path(file_name + ".out"), 'r') as outfile:
with open(Path(file_name + ".out"), "r") as outfile:
flag = outfile.readlines()[DICE_FLAG_LINE].strip()
if flag != DICE_END_FLAG:
raise RuntimeError(f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly")
raise RuntimeError(
f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly"
)
logger.info(f"Dice {file_name} - step{cycle:02d}-p{proc:02d} exited properly")

View File

@@ -1,24 +1,23 @@
from __future__ import annotations
from diceplayer import logger
from diceplayer.shared.config.player_config import PlayerConfig
from diceplayer.shared.environment.atom import Atom
from diceplayer.shared.environment.molecule import Molecule
from diceplayer.shared.environment.system import System
from diceplayer.shared.environment.atom import Atom
from diceplayer.shared.interface import Interface
from diceplayer.shared.utils.misc import date_time
from diceplayer.shared.utils.ptable import atomsymb
from diceplayer.shared.interface import Interface
from diceplayer import logger
from typing import Tuple, List, Dict, Any
from nptyping import NDArray
import numpy as np
from nptyping import NDArray
from pathlib import Path
import os
import shutil
import subprocess
import textwrap
import shutil
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple
class GaussianInterface(Interface):
@@ -33,10 +32,7 @@ class GaussianInterface(Interface):
self._copy_chk_file_from_previous_step(cycle)
asec_charges = self.populate_asec_vdw(cycle)
self._make_gaussian_input_file(
cycle,
asec_charges
)
self._make_gaussian_input_file(cycle, asec_charges)
self._run_gaussian(cycle)
self._run_formchk(cycle)
@@ -49,9 +45,7 @@ class GaussianInterface(Interface):
raise NotImplementedError("Optimization not implemented yet.")
else:
return_value['charges'] = np.array(
self._read_charges_from_fchk(cycle)
)
return_value["charges"] = np.array(self._read_charges_from_fchk(cycle))
return return_value
@@ -60,36 +54,22 @@ class GaussianInterface(Interface):
del self.system
def _make_qm_dir(self, cycle: int):
qm_dir_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
"qm"
)
qm_dir_path = Path(self.step.simulation_dir, f"step{cycle:02d}", "qm")
if not qm_dir_path.exists():
qm_dir_path.mkdir()
def _copy_chk_file_from_previous_step(self, cycle: int):
current_chk_file_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
"qm",
f"asec.chk"
self.step.simulation_dir, f"step{cycle:02d}", "qm", f"asec.chk"
)
if current_chk_file_path.exists():
raise FileExistsError(
f"File {current_chk_file_path} already exists."
)
raise FileExistsError(f"File {current_chk_file_path} already exists.")
previous_chk_file_path = Path(
self.step.simulation_dir,
f"step{(cycle - 1):02d}",
"qm",
f"asec.chk"
self.step.simulation_dir, f"step{(cycle - 1):02d}", "qm", f"asec.chk"
)
if not previous_chk_file_path.exists():
raise FileNotFoundError(
f"File {previous_chk_file_path} does not exist."
)
raise FileNotFoundError(f"File {previous_chk_file_path} does not exist.")
shutil.copy(previous_chk_file_path, current_chk_file_path)
@@ -104,17 +84,19 @@ class GaussianInterface(Interface):
for proc in range(1, self.step.nprocs + 1):
proc_charges.append(self._read_charges_from_last_step(cycle, proc))
asec_charges, thickness, picked_mols = \
self._evaluate_proc_charges(nsites_total, proc_charges)
asec_charges, thickness, picked_mols = self._evaluate_proc_charges(
nsites_total, proc_charges
)
logger.info(f"In average, {(sum(picked_mols) / norm_factor):^7.2f} molecules\n"
f"were selected from each of the {len(picked_mols)} configurations\n"
f"of the production simulations to form the ASEC, comprising a shell with\n"
f"minimum thickness of {(sum(thickness) / norm_factor):>6.2f} Angstrom\n"
)
logger.info(
f"In average, {(sum(picked_mols) / norm_factor):^7.2f} molecules\n"
f"were selected from each of the {len(picked_mols)} configurations\n"
f"of the production simulations to form the ASEC, comprising a shell with\n"
f"minimum thickness of {(sum(thickness) / norm_factor):>6.2f} Angstrom\n"
)
for charge in asec_charges:
charge['chg'] = charge['chg'] / norm_factor
charge["chg"] = charge["chg"] / norm_factor
return asec_charges
@@ -135,23 +117,19 @@ class GaussianInterface(Interface):
def _read_charges_from_last_step(self, cycle: int, proc: int) -> list[str]:
last_xyz_file_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
f"p{proc:02d}",
"last.xyz"
self.step.simulation_dir, f"step{cycle:02d}", f"p{proc:02d}", "last.xyz"
)
if not last_xyz_file_path.exists():
raise FileNotFoundError(
f"File {last_xyz_file_path} does not exist."
)
raise FileNotFoundError(f"File {last_xyz_file_path} does not exist.")
with open(last_xyz_file_path, 'r') as last_xyz_file:
with open(last_xyz_file_path, "r") as last_xyz_file:
lines = last_xyz_file.readlines()
return lines
def _evaluate_proc_charges(self, total_nsites: int, proc_charges: list[list[str]]) -> Tuple[
List[Dict[str, float | Any]], List[float], List[int]]:
def _evaluate_proc_charges(
self, total_nsites: int, proc_charges: list[list[str]]
) -> Tuple[List[Dict[str, float | Any]], List[float], List[int]]:
asec_charges = []
thickness = []
@@ -164,9 +142,7 @@ class GaussianInterface(Interface):
f"Number of sites does not match total number of sites."
)
thickness.append(
self._calculate_proc_thickness(charges)
)
thickness.append(self._calculate_proc_thickness(charges))
nsites_ref_mol = len(self.system.molecule[0].atom)
charges = charges[nsites_ref_mol:]
@@ -183,7 +159,12 @@ class GaussianInterface(Interface):
for site in range(len(self.system.molecule[type].atom)):
line = charges.pop(0).split()
if line[0].title() != atomsymb[self.system.molecule[type].atom[site].na].strip():
if (
line[0].title()
!= atomsymb[
self.system.molecule[type].atom[site].na
].strip()
):
raise SyntaxError(
f"Error: Invalid Dice Output. Atom type does not match."
)
@@ -201,13 +182,18 @@ class GaussianInterface(Interface):
)
)
distance = self.system.molecule[0] \
.minimum_distance(new_molecule)
distance = self.system.molecule[0].minimum_distance(new_molecule)
if distance < thickness[-1]:
for atom in new_molecule.atom:
asec_charges.append(
{"lbl": atomsymb[atom.na], "rx": atom.rx, "ry": atom.ry, "rz": atom.rz, "chg": atom.chg}
{
"lbl": atomsymb[atom.na],
"rx": atom.rx,
"ry": atom.ry,
"rz": atom.rz,
"chg": atom.chg,
}
)
mol_count += 1
@@ -230,18 +216,17 @@ class GaussianInterface(Interface):
def _make_gaussian_input_file(self, cycle: int, asec_charges: list[dict]) -> None:
gaussian_input_file_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
"qm",
f"asec.gjf"
self.step.simulation_dir, f"step{cycle:02d}", "qm", f"asec.gjf"
)
with open(gaussian_input_file_path, 'w') as gaussian_input_file:
with open(gaussian_input_file_path, "w") as gaussian_input_file:
gaussian_input_file.writelines(
self._generate_gaussian_input(cycle, asec_charges)
)
def _generate_gaussian_input(self, cycle: int, asec_charges: list[dict]) -> list[str]:
def _generate_gaussian_input(
self, cycle: int, asec_charges: list[dict]
) -> list[str]:
gaussian_input = ["%Chk=asec.chk\n"]
if self.step.mem is not None:
@@ -268,7 +253,9 @@ class GaussianInterface(Interface):
gaussian_input.append("\nForce calculation - Cycle number {}\n".format(cycle))
gaussian_input.append("\n")
gaussian_input.append(f"{self.step.gaussian.chgmult[0]},{self.step.gaussian.chgmult[1]}\n")
gaussian_input.append(
f"{self.step.gaussian.chgmult[0]},{self.step.gaussian.chgmult[1]}\n"
)
for atom in self.system.molecule[0].atom:
symbol = atomsymb[atom.na]
@@ -283,7 +270,7 @@ class GaussianInterface(Interface):
for charge in asec_charges:
gaussian_input.append(
"{:>10.5f} {:>10.5f} {:>10.5f} {:>11.8f}\n".format(
charge['rx'], charge['ry'], charge['rz'], charge['chg']
charge["rx"], charge["ry"], charge["rz"], charge["chg"]
)
)
@@ -292,11 +279,7 @@ class GaussianInterface(Interface):
return gaussian_input
def _run_gaussian(self, cycle: int) -> None:
qm_dir = Path(
self.step.simulation_dir,
f"step{(cycle):02d}",
"qm"
)
qm_dir = Path(self.step.simulation_dir, f"step{(cycle):02d}", "qm")
working_dir = os.getcwd()
os.chdir(qm_dir)
@@ -319,7 +302,10 @@ class GaussianInterface(Interface):
"bash",
"-c",
"exec -a {}-step{} {} {}".format(
self.step.gaussian.qmprog, cycle, self.step.gaussian.qmprog, infile
self.step.gaussian.qmprog,
cycle,
self.step.gaussian.qmprog,
infile,
),
]
)
@@ -334,18 +320,16 @@ class GaussianInterface(Interface):
os.chdir(working_dir)
def _run_formchk(self, cycle: int):
qm_dir = Path(
self.step.simulation_dir,
f"step{(cycle):02d}",
"qm"
)
qm_dir = Path(self.step.simulation_dir, f"step{(cycle):02d}", "qm")
work_dir = os.getcwd()
os.chdir(qm_dir)
logger.info("Formatting the checkpoint file... \n")
exit_status = subprocess.call(["formchk", "asec.chk"], stdout=subprocess.DEVNULL)
exit_status = subprocess.call(
["formchk", "asec.chk"], stdout=subprocess.DEVNULL
)
if exit_status != 0:
raise SystemError("Formchk process did not exit properly")
@@ -355,12 +339,7 @@ class GaussianInterface(Interface):
os.chdir(work_dir)
def _read_charges_from_fchk(self, cycle: int):
fchk_file_path = Path(
"simfiles",
f"step{cycle:02d}",
"qm",
"asec.fchk"
)
fchk_file_path = Path("simfiles", f"step{cycle:02d}", "qm", "asec.fchk")
with open(fchk_file_path) as fchk:
fchkfile = fchk.readlines()