Files
DicePlayer/diceplayer/shared/interface/gaussian_interface.py

381 lines
12 KiB
Python

from __future__ import annotations
from diceplayer.shared.config.player_config import PlayerConfig
from diceplayer.shared.environment.molecule import Molecule
from diceplayer.shared.environment.system import System
from diceplayer.shared.environment.atom import Atom
from diceplayer.shared.utils.misc import date_time
from diceplayer.shared.utils.ptable import atomsymb
from diceplayer.shared.interface import Interface
from diceplayer import logger
from typing import Tuple, List, Dict, Any
from nptyping import NDArray
import numpy as np
from pathlib import Path
import subprocess
import textwrap
import shutil
import os
class GaussianInterface(Interface):
def configure(self, step_dto: PlayerConfig, system: System):
self.system = system
self.step = step_dto
def start(self, cycle: int) -> Dict[str, NDArray]:
self._make_qm_dir(cycle)
if cycle > 1:
self._copy_chk_file_from_previous_step(cycle)
asec_charges = self.populate_asec_vdw(cycle)
self._make_gaussian_input_file(
cycle,
asec_charges
)
self._run_gaussian(cycle)
self._run_formchk(cycle)
return_value = {}
if self.step.opt:
# return_value['position'] = np.array(
# self._run_optimization(cycle)
# )
raise NotImplementedError("Optimization not implemented yet.")
else:
return_value['charges'] = np.array(
self._read_charges_from_fchk(cycle)
)
return return_value
def reset(self):
del self.step
del self.system
def _make_qm_dir(self, cycle: int):
qm_dir_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
"qm"
)
if not qm_dir_path.exists():
qm_dir_path.mkdir()
def _copy_chk_file_from_previous_step(self, cycle: int):
current_chk_file_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
"qm",
f"asec.chk"
)
if current_chk_file_path.exists():
raise FileExistsError(
f"File {current_chk_file_path} already exists."
)
previous_chk_file_path = Path(
self.step.simulation_dir,
f"step{(cycle - 1):02d}",
"qm",
f"asec.chk"
)
if not previous_chk_file_path.exists():
raise FileNotFoundError(
f"File {previous_chk_file_path} does not exist."
)
shutil.copy(previous_chk_file_path, current_chk_file_path)
def populate_asec_vdw(self, cycle: int) -> list[dict]:
norm_factor = self._calculate_norm_factor()
nsitesref = len(self.system.molecule[0].atom)
nsites_total = self._calculate_total_number_of_sites(nsitesref)
proc_charges = []
for proc in range(1, self.step.nprocs + 1):
proc_charges.append(self._read_charges_from_last_step(cycle, proc))
asec_charges, thickness, picked_mols = \
self._evaluate_proc_charges(nsites_total, proc_charges)
logger.info(f"In average, {(sum(picked_mols) / norm_factor):^7.2f} molecules\n"
f"were selected from each of the {len(picked_mols)} configurations\n"
f"of the production simulations to form the ASEC, comprising a shell with\n"
f"minimum thickness of {(sum(thickness) / norm_factor):>6.2f} Angstrom\n"
)
for charge in asec_charges:
charge['chg'] = charge['chg'] / norm_factor
return asec_charges
def _calculate_norm_factor(self) -> int:
if self.step.dice.nstep[-1] % self.step.dice.isave == 0:
nconfigs = round(self.step.dice.nstep[-1] / self.step.dice.isave)
else:
nconfigs = int(self.step.dice.nstep[-1] / self.step.dice.isave)
return nconfigs * self.step.nprocs
def _calculate_total_number_of_sites(self, nsitesref) -> int:
nsites_total = self.step.dice.nmol[0] * nsitesref
for i in range(1, len(self.step.dice.nmol)):
nsites_total += self.step.dice.nmol[i] * len(self.system.molecule[i].atom)
return nsites_total
def _read_charges_from_last_step(self, cycle: int, proc: int) -> list[str]:
last_xyz_file_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
f"p{proc:02d}",
"last.xyz"
)
if not last_xyz_file_path.exists():
raise FileNotFoundError(
f"File {last_xyz_file_path} does not exist."
)
with open(last_xyz_file_path, 'r') as last_xyz_file:
lines = last_xyz_file.readlines()
return lines
def _evaluate_proc_charges(self, total_nsites: int, proc_charges: list[list[str]]) -> Tuple[
List[Dict[str, float | Any]], List[float], List[int]]:
asec_charges = []
thickness = []
picked_mols = []
for charges in proc_charges:
charges_nsites = int(charges.pop(0))
if int(charges_nsites) != total_nsites:
raise ValueError(
f"Number of sites does not match total number of sites."
)
thickness.append(
self._calculate_proc_thickness(charges)
)
nsites_ref_mol = len(self.system.molecule[0].atom)
charges = charges[nsites_ref_mol:]
mol_count = 0
for type in range(len(self.step.dice.nmol)):
if type == 0:
# Reference Molecule must be ignored from type 0
nmols = self.step.dice.nmol[type] - 1
else:
nmols = self.step.dice.nmol[type]
for mol in range(nmols):
new_molecule = Molecule("ASEC TMP MOLECULE")
for site in range(len(self.system.molecule[type].atom)):
line = charges.pop(0).split()
if line[0].title() != atomsymb[self.system.molecule[type].atom[site].na].strip():
raise SyntaxError(
f"Error: Invalid Dice Output. Atom type does not match."
)
new_molecule.add_atom(
Atom(
self.system.molecule[type].atom[site].lbl,
self.system.molecule[type].atom[site].na,
float(line[1]),
float(line[2]),
float(line[3]),
self.system.molecule[type].atom[site].chg,
self.system.molecule[type].atom[site].eps,
self.system.molecule[type].atom[site].sig,
)
)
distance = self.system.molecule[0] \
.minimum_distance(new_molecule)
if distance < thickness[-1]:
for atom in new_molecule.atom:
asec_charges.append(
{"lbl": atomsymb[atom.na], "rx": atom.rx, "ry": atom.ry, "rz": atom.rz, "chg": atom.chg}
)
mol_count += 1
picked_mols.append(mol_count)
return asec_charges, thickness, picked_mols
def _calculate_proc_thickness(self, charges: list[str]) -> float:
box = charges.pop(0).split()[-3:]
box = [float(box[0]), float(box[1]), float(box[2])]
sizes = self.system.molecule[0].sizes_of_molecule()
return min(
[
(box[0] - sizes[0]) / 2,
(box[1] - sizes[1]) / 2,
(box[2] - sizes[2]) / 2,
]
)
def _make_gaussian_input_file(self, cycle: int, asec_charges: list[dict]) -> None:
gaussian_input_file_path = Path(
self.step.simulation_dir,
f"step{cycle:02d}",
"qm",
f"asec.gjf"
)
with open(gaussian_input_file_path, 'w') as gaussian_input_file:
gaussian_input_file.writelines(
self._generate_gaussian_input(cycle, asec_charges)
)
def _generate_gaussian_input(self, cycle: int, asec_charges: list[dict]) -> list[str]:
gaussian_input = ["%Chk=asec.chk\n"]
if self.step.mem is not None:
gaussian_input.append(f"%Mem={self.step.mem}GB\n")
gaussian_input.append(f"%Nprocs={self.step.nprocs * self.step.ncores}\n")
kwords_line = f"#P {self.step.gaussian.level}"
if self.step.gaussian.keywords:
kwords_line += " " + self.step.gaussian.keywords
if self.step.opt == "yes":
kwords_line += " Force"
kwords_line += " NoSymm"
kwords_line += f" Pop={self.step.gaussian.pop} Density=Current"
if cycle > 1:
kwords_line += " Guess=Read"
gaussian_input.append(textwrap.fill(kwords_line, 90))
gaussian_input.append("\n")
gaussian_input.append("\nForce calculation - Cycle number {}\n".format(cycle))
gaussian_input.append("\n")
gaussian_input.append(f"{self.step.gaussian.chgmult[0]},{self.step.gaussian.chgmult[1]}\n")
for atom in self.system.molecule[0].atom:
symbol = atomsymb[atom.na]
gaussian_input.append(
"{:<2s} {:>10.5f} {:>10.5f} {:>10.5f}\n".format(
symbol, atom.rx, atom.ry, atom.rz
)
)
gaussian_input.append("\n")
for charge in asec_charges:
gaussian_input.append(
"{:>10.5f} {:>10.5f} {:>10.5f} {:>11.8f}\n".format(
charge['rx'], charge['ry'], charge['rz'], charge['chg']
)
)
gaussian_input.append("\n")
return gaussian_input
def _run_gaussian(self, cycle: int) -> None:
qm_dir = Path(
self.step.simulation_dir,
f"step{(cycle):02d}",
"qm"
)
working_dir = os.getcwd()
os.chdir(qm_dir)
infile = "asec.gjf"
operation = None
if self.step.opt:
operation = "forces"
else:
operation = "charges"
logger.info(
f"Calculation of {operation} initiated with Gaussian on {date_time()}\n"
)
if shutil.which("bash") is not None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a {}-step{} {} {}".format(
self.step.gaussian.qmprog, cycle, self.step.gaussian.qmprog, infile
),
]
)
else:
exit_status = subprocess.call([self.step.gaussian.qmprog, infile])
if exit_status != 0:
raise SystemError("Gaussian process did not exit properly")
logger.info(f"Calculation of {operation} finished on {date_time()}")
os.chdir(working_dir)
def _run_formchk(self, cycle: int):
qm_dir = Path(
self.step.simulation_dir,
f"step{(cycle):02d}",
"qm"
)
work_dir = os.getcwd()
os.chdir(qm_dir)
logger.info("Formatting the checkpoint file... \n")
exit_status = subprocess.call(["formchk", "asec.chk"], stdout=subprocess.DEVNULL)
if exit_status != 0:
raise SystemError("Formchk process did not exit properly")
logger.info("Done\n")
os.chdir(work_dir)
def _read_charges_from_fchk(self, cycle: int):
fchk_file_path = Path(
"simfiles",
f"step{cycle:02d}",
"qm",
"asec.fchk"
)
with open(fchk_file_path) as fchk:
fchkfile = fchk.readlines()
if self.step.gaussian.pop in ["chelpg", "mk"]:
CHARGE_FLAG = "ESP Charges"
else:
CHARGE_FLAG = "ESP Charges"
start = fchkfile.pop(0).strip()
while start.find(CHARGE_FLAG) != 0: # expression in begining of line
start = fchkfile.pop(0).strip()
charges: List[float] = []
while len(charges) < len(self.system.molecule[0].atom):
charges.extend([float(x) for x in fchkfile.pop(0).split()])
return charges