Initial Folder Rework Implementation

Adds the Environment, External, Utils folder inside de DPpack. All classes are going to be implemented there
This commit is contained in:
2022-05-31 09:13:08 -03:00
parent f4e892cf34
commit 4ae8385918
23 changed files with 3458 additions and 3611 deletions

786
diceplayer/DPpack/External/Dice.py vendored Normal file
View File

@@ -0,0 +1,786 @@
from dataclasses import dataclass
from diceplayer.DPpack.Utils.PTable import *
from diceplayer.DPpack.Utils.Misc import *
from diceplayer.DPpack.Environment.Molecule import Molecule
from diceplayer.DPpack.Environment.Atom import Atom
from typing import IO, Final, Tuple, List, TextIO, Union
from numpy.core.numeric import partition
from numpy import random
from multiprocessing import Process, connection
import subprocess
import setproctitle
import os
import sys
import shutil
from diceplayer.DPpack.Utils.Validations import NotNull
DICE_END_FLAG: Final[str] = "End of simulation"
DICE_FLAG_LINE: Final[int] = -2
UMAANG3_TO_GCM3: Final[float] = 1.6605
MAX_SEED: Final[int] = 4294967295
class Dice:
title = "Diceplayer run"
progname = "dice"
path = None
nprocs: int = None
randominit = "first"
combrule = "*"
ncores = 1
temp = 300.0
press = 1.0
isave = 1000
dens = None
ljname = None
outname = None
nmol: List[int] = []
nstep: List[int] = []
upbuf = 360
def __init__(self, infile: TextIO, outfile: TextIO) -> None:
self.infile = infile
self.outfile = outfile
@NotNull(requiredArgs = [
"ncores",
"nmol",
"dens",
"nstep",
"ljname",
"outname"
])
def updateKeywords(self, **data):
self.__dict__.update(data)
def __new_density(self, cycle: int, proc: int) -> float:
sim_dir = "simfiles"
step_dir = "step{:02d}".format(cycle - 1)
proc_dir = "p{:02d}".format(proc)
path = sim_dir + os.sep + step_dir + os.sep + proc_dir
file = path + os.sep + "last.xyz"
if not os.path.isfile(file):
sys.exit(
"Error: cannot find the xyz file {} in main directory".format(file)
)
try:
with open(file) as fh:
xyzfile = fh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
box = xyzfile[1].split()
volume = float(box[-3]) * float(box[-2]) * float(box[-1])
total_mass = 0
for i in range(len(self.molecule)):
total_mass += self.molecule[i].total_mass * self.nmol[i]
density = (total_mass / volume) * UMAANG3_TO_GCM3
return density
def __print_last_config(self, cycle: int, proc: int) -> None:
sim_dir = "simfiles"
step_dir = "step{:02d}".format(cycle)
proc_dir = "p{:02d}".format(proc)
path = sim_dir + os.sep + step_dir + os.sep + proc_dir
file = path + os.sep + "phb.xyz"
if not os.path.isfile(file):
sys.exit("Error: cannot find the xyz file {}".format(file))
try:
with open(file) as fh:
xyzfile = fh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
nsites = len(self.molecule[0].atom) * self.nmol[0]
for i in range(1, len(self.nmol)):
nsites += self.nmol[i] * len(self.molecule[i].atom)
nsites += 2
nsites *= -1
xyzfile = xyzfile[nsites:]
file = path + os.sep + "last.xyz"
fh = open(file, "w")
for line in xyzfile:
fh.write(line)
def __make_dice_inputs(self, cycle: int, proc: int) -> None:
sim_dir = "simfiles"
step_dir = "step{:02d}".format(cycle)
proc_dir = "p{:02d}".format(proc)
path = sim_dir + os.sep + step_dir + os.sep + proc_dir
num = time.time()
num = (num - int(num)) * 1e6
num = int((num - int(num)) * 1e6)
random.seed((os.getpid() * num) % (MAX_SEED + 1))
if self.randominit == "first" and cycle > self.initcyc:
last_step_dir = "step{:02d}".format(cycle - 1)
last_path = sim_dir + os.sep + last_step_dir + os.sep + proc_dir
xyzfile = last_path + os.sep + "last.xyz"
self.__make_init_file(path, xyzfile)
if len(self.nstep) == 2:
self.__make_nvt_ter(cycle, path)
self.__make_nvt_eq(path)
elif len(self.nstep) == 3:
if self.randominit == "first" and cycle > self.initcyc:
self.dens = self.__new_density(cycle, proc)
else:
self.__make_nvt_ter(cycle, path)
self.__make_npt_ter(cycle, path)
self.__make_npt_eq(path)
else:
sys.exit("Error: bad number of entries for 'nstep'")
self.__make_potential(path)
def __make_nvt_ter(self, cycle: int, path: str) -> None:
file = path + os.sep + "NVT.ter"
try:
fh = open(file, "w")
except:
sys.exit("Error: cannot open file {}".format(file))
fh.write("title = {} - NVT Thermalization\n".format(self.title))
fh.write("ncores = {}\n".format(self.ncores))
fh.write("ljname = {}\n".format(self.ljname))
fh.write("outname = {}\n".format(self.outname))
string = " ".join(str(x) for x in self.nmol)
fh.write("nmol = {}\n".format(string))
fh.write("dens = {}\n".format(self.dens))
fh.write("temp = {}\n".format(self.temp))
if self.randominit == "first" and cycle > self.initcyc:
fh.write("init = yesreadxyz\n")
fh.write("nstep = {}\n".format(self.altsteps))
else:
fh.write("init = yes\n")
fh.write("nstep = {}\n".format(self.nstep[0]))
fh.write("vstep = 0\n")
fh.write("mstop = 1\n")
fh.write("accum = no\n")
fh.write("iprint = 1\n")
fh.write("isave = 0\n")
fh.write("irdf = 0\n")
seed = int(1e6 * random.random())
fh.write("seed = {}\n".format(seed))
fh.write("upbuf = {}".format(self.upbuf))
fh.close()
def __make_nvt_eq(self, path: str) -> None:
file = path + os.sep + "NVT.eq"
try:
fh = open(file, "w")
except:
sys.exit("Error: cannot open file {}".format(file))
fh.write("title = {} - NVT Production\n".format(self.title))
fh.write("ncores = {}\n".format(self.ncores))
fh.write("ljname = {}\n".format(self.ljname))
fh.write("outname = {}\n".format(self.outname))
string = " ".join(str(x) for x in self.nmol)
fh.write("nmol = {}\n".format(string))
fh.write("dens = {}\n".format(self.dens))
fh.write("temp = {}\n".format(self.temp))
fh.write("init = no\n")
fh.write("nstep = {}\n".format(self.nstep[1]))
fh.write("vstep = 0\n")
fh.write("mstop = 1\n")
fh.write("accum = no\n")
fh.write("iprint = 1\n")
fh.write("isave = {}\n".format(self.isave))
fh.write("irdf = {}\n".format(10 * self.nprocs))
seed = int(1e6 * random.random())
fh.write("seed = {}\n".format(seed))
fh.close()
def __make_npt_ter(self, cycle: int, path: str) -> None:
file = path + os.sep + "NPT.ter"
try:
fh = open(file, "w")
except:
sys.exit("Error: cannot open file {}".format(file))
fh.write("title = {} - NPT Thermalization\n".format(self.title))
fh.write("ncores = {}\n".format(self.ncores))
fh.write("ljname = {}\n".format(self.ljname))
fh.write("outname = {}\n".format(self.outname))
string = " ".join(str(x) for x in self.nmol)
fh.write("nmol = {}\n".format(string))
fh.write("press = {}\n".format(self.press))
fh.write("temp = {}\n".format(self.temp))
if self.randominit == "first" and cycle > self.initcyc:
fh.write("init = yesreadxyz\n")
fh.write("dens = {:<8.4f}\n".format(self.dens))
fh.write("vstep = {}\n".format(int(self.altsteps / 5)))
else:
fh.write("init = no\n")
fh.write("vstep = {}\n".format(int(self.nstep[1] / 5)))
fh.write("nstep = 5\n")
fh.write("mstop = 1\n")
fh.write("accum = no\n")
fh.write("iprint = 1\n")
fh.write("isave = 0\n")
fh.write("irdf = 0\n")
seed = int(1e6 * random.random())
fh.write("seed = {}\n".format(seed))
fh.close()
def __make_npt_eq(self, path: str) -> None:
file = path + os.sep + "NPT.eq"
try:
fh = open(file, "w")
except:
sys.exit("Error: cannot open file {}".format(file))
fh.write("title = {} - NPT Production\n".format(self.title))
fh.write("ncores = {}\n".format(self.ncores))
fh.write("ljname = {}\n".format(self.ljname))
fh.write("outname = {}\n".format(self.outname))
string = " ".join(str(x) for x in self.nmol)
fh.write("nmol = {}\n".format(string))
fh.write("press = {}\n".format(self.press))
fh.write("temp = {}\n".format(self.temp))
fh.write("nstep = 5\n")
fh.write("vstep = {}\n".format(int(self.nstep[2] / 5)))
fh.write("init = no\n")
fh.write("mstop = 1\n")
fh.write("accum = no\n")
fh.write("iprint = 1\n")
fh.write("isave = {}\n".format(self.isave))
fh.write("irdf = {}\n".format(10 * self.nprocs))
seed = int(1e6 * random.random())
fh.write("seed = {}\n".format(seed))
fh.close()
def __make_init_file(self, path: str, file: TextIO) -> None:
if not os.path.isfile(file):
sys.exit(
"Error: cannot find the xyz file {} in main directory".format(file)
)
try:
with open(file) as fh:
xyzfile = fh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
nsites_mm = 0
for i in range(1, len(self.nmol)):
nsites_mm += self.nmol[i] * len(self.molecule[i].atom)
nsites_mm *= -1
xyzfile = xyzfile[nsites_mm:]
file = path + os.sep + self.outname + ".xy"
try:
fh = open(file, "w", 1)
except:
sys.exit("Error: cannot open file {}".format(file))
for atom in self.molecule[0].atom:
fh.write(
"{:>10.6f} {:>10.6f} {:>10.6f}\n".format(atom.rx, atom.ry, atom.rz)
)
for line in xyzfile:
atom = line.split()
rx = float(atom[1])
ry = float(atom[2])
rz = float(atom[3])
fh.write("{:>10.6f} {:>10.6f} {:>10.6f}\n".format(rx, ry, rz))
fh.write("$end")
fh.close()
def __make_potential(self, path: str) -> None:
fstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f}\n"
file = path + os.sep + self.ljname
try:
fh = open(file, "w")
except:
sys.exit("Error: cannot open file {}".format(file))
fh.write("{}\n".format(self.combrule))
fh.write("{}\n".format(len(self.nmol)))
nsites_qm = (
len(self.molecule[0].atom)
+ len(self.molecule[0].ghost_atoms)
+ len(self.molecule[0].lp_atoms)
)
fh.write("{} {}\n".format(nsites_qm, self.molecule[0].molname))
for atom in self.molecule[0].atom:
fh.write(
fstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
)
)
ghost_label = self.molecule[0].atom[-1].lbl + 1
for i in self.molecule[0].ghost_atoms:
fh.write(
fstr.format(
ghost_label,
ghost_number,
self.molecule[0].atom[i].rx,
self.molecule[0].atom[i].ry,
self.molecule[0].atom[i].rz,
self.molecule[0].atom[i].chg,
0,
0,
)
)
ghost_label += 1
for lp in self.molecule[0].lp_atoms:
fh.write(
fstr.format(
ghost_label,
ghost_number,
lp["rx"],
lp["ry"],
lp["rz"],
lp["chg"],
0,
0,
)
)
for mol in self.molecule[1:]:
fh.write("{} {}\n".format(len(mol.atom), mol.molname))
for atom in mol.atom:
fh.write(
fstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
)
)
def __make_proc_dir(self, cycle: int, proc: int) -> None:
sim_dir = "simfiles"
step_dir = "step{:02d}".format(cycle)
proc_dir = "p{:02d}".format(proc)
path = sim_dir + os.sep + step_dir + os.sep + proc_dir
try:
os.makedirs(path)
except:
sys.exit("Error: cannot make directory {}".format(path))
def __run_dice(self, cycle: int, proc: int, fh: str) -> None:
sim_dir = "simfiles"
step_dir = "step{:02d}".format(cycle)
proc_dir = "p{:02d}".format(proc)
try:
fh.write(
"Simulation process {} initiated with pid {}\n".format(
sim_dir + os.sep + step_dir + os.sep + proc_dir, os.getpid()
)
)
except Exception as err:
print("I/O error({0}): {1}".format(err))
path = sim_dir + os.sep + step_dir + os.sep + proc_dir
working_dir = os.getcwd()
os.chdir(path)
if len(self.nstep) == 2:
if self.randominit == "first" and cycle > self.initcyc:
string_tmp = "previous"
else:
string_tmp = "random"
string = "(from " + string_tmp + " configuration)"
fh.write(
"p{:02d}> NVT thermalization finished {} on {}\n".format(
proc, string, date_time()
)
)
infh = open("NVT.ter")
outfh = open("NVT.ter.out", "w")
if shutil.which("bash") != None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a dice-step{}-p{} {} < {} > {}".format(
cycle, proc, self.progname, infh.name, outfh.name
),
]
)
else:
exit_status = subprocess.call(
self.progname, stin=infh.name, stout=outfh.name
)
infh.close()
outfh.close()
if os.getppid() == 1:
sys.exit()
if exit_status != 0:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
else:
outfh = open("NVT.ter.out")
flag = outfh.readlines()[DICE_FLAG_LINE].strip()
outfh.close()
if flag != DICE_END_FLAG:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
fh.write(
"p{:02d}> NVT production initiated on {}\n".format(proc, date_time())
)
infh = open("NVT.eq")
outfh = open("NVT.eq.out", "w")
if shutil.which("bash") != None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a dice-step{}-p{} {} < {} > {}".format(
cycle, proc, self.progname, infh.name, outfh.name
),
]
)
else:
exit_status = subprocess.call(
self.progname, stin=infh.name, stout=outfh.name
)
infh.close()
outfh.close()
if os.getppid() == 1:
sys.exit()
if exit_status != 0:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
else:
outfh = open("NVT.eq.out")
flag = outfh.readlines()[DICE_FLAG_LINE].strip()
outfh.close()
if flag != DICE_END_FLAG:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
fh.write(
"p{:02d}> ----- NVT production finished on {}\n".format(
proc, date_time()
)
)
elif len(self.nstep) == 3:
if (
self.randominit == "always"
or (self.randominit == "first" and cycle == 1)
or self.continued
):
string = "(from random configuration)"
fh.write(
"p{:02d}> NVT thermalization initiated {} on {}\n".format(
proc, string, date_time()
)
)
infh = open("NVT.ter")
outfh = open("NVT.ter.out", "w")
if shutil.which("bash") != None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a dice-step{}-p{} {} < {} > {}".format(
cycle, proc, self.progname, infh.name, outfh.name
),
]
)
else:
exit_status = subprocess.call(
self.progname, stin=infh.name, stout=outfh.name
)
infh.close()
outfh.close()
if os.getppid() == 1:
sys.exit()
if exit_status != 0:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
else:
outfh = open("NVT.ter.out")
flag = outfh.readlines()[DICE_FLAG_LINE].strip()
outfh.close()
if flag != DICE_END_FLAG:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
if not self.randominit == "always" or (
(self.randominit == "first" and cycle > self.initcyc)
):
string = " (from previous configuration) "
else:
string = " "
fh.write(
"p{:02d}> NPT thermalization finished {} on {}\n".format(
proc, string, date_time()
)
)
infh = open("NPT.ter")
outfh = open("NPT.ter.out", "w")
if shutil.which("bash") != None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a dice-step{}-p{} {} < {} > {}".format(
cycle, proc, self.progname, infh.name, outfh.name
),
]
)
else:
exit_status = subprocess.call(
self.progname, stin=infh.name, stout=outfh.name
)
infh.close()
outfh.close()
if os.getppid() == 1:
sys.exit()
if exit_status != 0:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
else:
outfh = open("NPT.ter.out")
flag = outfh.readlines()[DICE_FLAG_LINE].strip()
outfh.close()
if flag != DICE_END_FLAG:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
fh.write(
"p{:02d}> NPT production initiated on {}\n".format(proc, date_time())
)
infh = open("NPT.eq")
outfh = open("NPT.eq.out", "w")
if shutil.which("bash") != None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a dice-step{}-p{} {} < {} > {}".format(
cycle, proc, self.progname, infh.name, outfh.name
),
]
)
else:
exit_status = subprocess.call(
self.progname, stin=infh.name, stout=outfh.name
)
infh.close()
outfh.close()
if os.getppid() == 1:
sys.exit()
if exit_status != 0:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
else:
outfh = open("NPT.eq.out")
flag = outfh.readlines()[DICE_FLAG_LINE].strip()
outfh.close()
if flag != DICE_END_FLAG:
sys.exit(
"Dice process step{:02d}-p{:02d} did not exit properly".format(
cycle, proc
)
)
fh.write(
"p{:02d}> ----- NPT production finished on {}\n".format(
proc, date_time()
)
)
os.chdir(working_dir)
def __simulation_process(self, cycle: int, proc: int):
setproctitle.setproctitle("diceplayer-step{:0d}-p{:0d}".format(cycle, proc))
try:
self.__make_proc_dir(cycle, proc)
self.__make_dice_inputs(cycle, proc)
self.__run_dice(cycle, proc, self.outfile)
except Exception as err:
sys.exit(err)
def configure(
self,
initcyc: int,
nprocs: int,
altsteps: int,
nmol: List[int],
molecule: List[Molecule],
):
self.initcyc = initcyc
self.nprocs = nprocs
self.altsteps = altsteps
self.nmol = nmol
self.molecule = molecule
def start(self, cycle: int) -> None:
procs = []
sentinels = []
for proc in range(1, self.nprocs + 1):
p = Process(target=self.__simulation_process, args=(cycle, proc))
p.start()
procs.append(p)
sentinels.append(p.sentinel)
while procs:
finished = connection.wait(sentinels)
for proc_sentinel in finished:
i = sentinels.index(proc_sentinel)
status = procs[i].exitcode
procs.pop(i)
sentinels.pop(i)
if status != 0:
for p in procs:
p.terminate()
sys.exit(status)
for proc in range(1, self.nprocs + 1):
self.__print_last_config(cycle, proc)
def reset(self):
del self.nprocs
del self.altsteps
del self.molecule

688
diceplayer/DPpack/External/Gaussian.py vendored Normal file
View File

@@ -0,0 +1,688 @@
from turtle import position
from diceplayer.DPpack.Environment.Atom import *
from diceplayer.DPpack.Utils.PTable import *
from diceplayer.DPpack.Utils.Misc import *
from diceplayer.DPpack.External.Dice import *
from diceplayer.DPpack.Environment.Molecule import Molecule
from diceplayer.DPpack.Environment.Atom import Atom
from typing import IO, List, TextIO
import numpy as np
import subprocess
import os
import sys
import shutil
import textwrap
import types
class Gaussian:
qmprog = "g09"
path = None
mem = None
keywords = None
chgmult = [0, 1]
gmiddle = None # In each case, if a filename is given, its content will be
gbottom = None # inserted in the gaussian input
pop = "chelpg"
level = None
def __init__(self) -> None:
pass
@NotNull(requiredArgs = [
"level"
])
def updateKeywords(self, **data):
self.__dict__.update(data)
def run_formchk(self, cycle: int, fh: TextIO):
simdir = "simfiles"
stepdir = "step{:02d}".format(cycle)
path = simdir + os.sep + stepdir + os.sep + "qm"
work_dir = os.getcwd()
os.chdir(path)
fh.write("Formatting the checkpoint file... \n")
exit_status = subprocess.call(["formchk", "asec.chk"], stdout=fh)
fh.write("Done\n")
os.chdir(work_dir)
def read_forces_fchk(self, file: str, fh: TextIO) -> np.ndarray:
forces = []
try:
with open(file) as tmpfh:
fchkfile = tmpfh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
start = fchkfile.pop(0).strip()
while start.find("Cartesian Gradient") != 0: # expression in begining of line
start = fchkfile.pop(0).strip()
degrees = 3 * len(self.molecule[0].atom)
count = 0
while len(forces) < degrees:
values = fchkfile.pop(0).split()
forces.extend([float(x) for x in values])
count += len(values)
if count >= degrees:
forces = forces[:degrees]
break
gradient = np.array(forces)
fh.write("\nGradient read from file {}:\n".format(file))
fh.write(
"-----------------------------------------------------------------------\n"
"Center Atomic Forces (Hartree/Bohr)\n"
"Number Number X Y Z\n"
"-----------------------------------------------------------------------\n"
)
for i in range(len(self.molecule[0].atom)):
fh.write(
" {:>5d} {:>3d} {:>14.9f} {:>14.9f} {:>14.9f}\n".format(
i + 1,
self.molecule[0].atom[i].na,
forces.pop(0),
forces.pop(0),
forces.pop(0),
)
)
fh.write(
"-----------------------------------------------------------------------\n"
)
force_max = np.amax(np.absolute(gradient))
force_rms = np.sqrt(np.mean(np.square(gradient)))
fh.write(
" Max Force = {:>14.9f} RMS Force = {:>14.9f}\n\n".format(
force_max, force_rms
)
)
return gradient
def read_hessian_fchk(self, file: str) -> np.ndarray:
force_const = []
try:
with open(file) as tmpfh:
fchkfile = tmpfh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
start = fchkfile.pop(0).strip()
while start.find("Cartesian Force Constants") != 0:
start = fchkfile.pop(0).strip()
degrees = 3 * len(self.molecule[0].atom)
last = round(degrees * (degrees + 1) / 2)
count = 0
while len(force_const) < last:
value = fchkfile.pop(0).split()
force_const.extend([float(x) for x in value])
# while len(force_const) < last:
# values = fchkfile.pop(0).split()
# force_const.extend([ float(x) for x in values ])
# count += len(values)
# if count >= last:
# force_const = force_const[:last]
# break
hessian = np.zeros((degrees, degrees))
for i in range(degrees):
for j in range(i + 1):
hessian[i, j] = force_const.pop(0)
hessian[j, i] = hessian[i, j]
return hessian
def read_hessian_log(self, file: str) -> np.ndarray:
try:
with open(file) as tmpfh:
logfile = tmpfh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
start = logfile.pop(0).strip()
while start.find("The second derivative matrix:") != 0:
start = logfile.pop(0).strip()
degrees = 3 * len(self.molecule[0].atom)
hessian = np.zeros((degrees, degrees))
k = 0
while k < degrees:
logfile.pop(0)
for i in range(k, degrees):
values = logfile.pop(0).split()[1:]
for j in range(k, min(i + 1, k + 5)):
hessian[i, j] = float(values.pop(0))
hessian[j, i] = hessian[i, j]
k += 5
return hessian
def print_grad_hessian(
self, cycle: int, cur_gradient: np.ndarray, hessian: np.ndarray
) -> None:
try:
fh = open("grad_hessian.dat", "w")
except:
sys.exit("Error: cannot open file grad_hessian.dat")
fh.write("Optimization cycle: {}\n".format(cycle))
fh.write("Cartesian Gradient\n")
degrees = 3 * len(self.molecule[0].atom)
for i in range(degrees):
fh.write(" {:>11.8g}".format(cur_gradient[i]))
if (i + 1) % 5 == 0 or i == degrees - 1:
fh.write("\n")
fh.write("Cartesian Force Constants\n")
n = int(np.sqrt(2 * degrees))
last = degrees * (degrees + 1) / 2
count = 0
for i in range(n):
for j in range(i + 1):
count += 1
fh.write(" {:>11.8g}".format(hessian[i, j]))
if count % 5 == 0 or count == last:
fh.write("\n")
fh.close()
# Change the name to make_gaussian_input
def make_gaussian_input(self, cycle: int, asec_charges=None) -> None:
simdir = "simfiles"
stepdir = "step{:02d}".format(cycle)
path = simdir + os.sep + stepdir + os.sep + "qm"
file = path + os.sep + "asec.gjf"
try:
fh = open(file, "w")
except:
sys.exit("Error: cannot open file {}".format(file))
fh.write("%Chk=asec.chk\n")
if self.mem != None:
fh.write("%Mem={}MB\n".format(self.mem))
fh.write("%Nprocs={}\n".format(self.nprocs * self.ncores))
kword_line = "#P " + str(self.level)
if self.keywords != None:
kword_line += " " + self.keywords
if self.opt == "yes":
kword_line += " Force"
# kword_line += " Charge"
kword_line += " NoSymm"
kword_line += " Pop={} Density=Current".format(self.pop)
if cycle > 1:
kword_line += " Guess=Read"
fh.write(textwrap.fill(kword_line, 90))
fh.write("\n")
fh.write("\nForce calculation - Cycle number {}\n".format(cycle))
fh.write("\n")
fh.write("{},{}\n".format(self.chgmult[0], self.chgmult[1]))
for atom in self.molecule[0].atom:
symbol = atomsymb[atom.na]
fh.write(
"{:<2s} {:>10.5f} {:>10.5f} {:>10.5f}\n".format(
symbol, atom.rx, atom.ry, atom.rz
)
)
# ## If also performing charge fit in the same calculation
# if cycle >= self.player.switchcyc:
# for ghost in ghost_atoms:
# fh.write("Bq {:>10.5f} {:>10.5f} {:>10.5f}\n".format(
# ghost['rx'], ghost['ry'], ghost['rz']))
# for lp in lp_atoms:
# fh.write("Bq {:>10.5f} {:>10.5f} {:>10.5f}\n".format(
# lp['rx'], lp['ry'], lp['rz']))
# fh.write("\n")
# If gmiddle file was informed, write its contents in asec.gjf
# if self.gmiddle != None:
# if not os.path.isfile(self.gmiddle):
# sys.exit("Error: cannot find file {} in main directory".format(
# self.gmiddle))
# try:
# with open(self.gmiddle) as gmiddlefile:
# gmiddle = gmiddlefile.readlines()
# except:
# sys.exit("Error: cannot open file {}".format(self.gmiddle))
# for line in gmiddle:
# fh.write(line)
# fh.write("\n")
# ## Write the ASEC:
# for charge in asec_charges:
# fh.write("{:>10.5f} {:>10.5f} {:>10.5f} {:>11.8f}\n".format(
# charge['rx'], charge['ry'], charge['rz'], charge['chg']))
fh.write("\n")
# ## If gbottom file was informed, write its contents in asec.gjf
# if self.gbottom != None:
# if not os.path.isfile(self.gbottom):
# sys.exit("Error: cannot find file {} in main directory".format(
# self.gbottom))
# try:
# with open(self.gbottom) as gbottomfile:
# gbottom = gbottomfile.readlines()
# except:
# sys.exit("Error: cannot open file {}".format(self.gbottom))
# for line in gbottom:
# fh.write(line)
# fh.write("\n")
# fh.close()
def read_charges(self, file: str, fh: TextIO) -> None:
try:
with open(file) as tmpfh:
glogfile = tmpfh.readlines()
except:
sys.exit("Error: cannot open file {}".format(file))
start = glogfile.pop(0).strip()
while start != "Fitting point charges to electrostatic potential":
start = glogfile.pop(0).strip()
glogfile = glogfile[3:] # Consume 3 more lines
fh.write("\nAtomic charges:\n")
fh.write("------------------------------------\n")
for atom in self.molecule[0].atom:
line = glogfile.pop(0).split()
atom_str = line[1]
charge = float(line[2])
atom.chg = charge
fh.write(" {:<2s} {:>10.6f}\n".format(atom_str, charge))
# if self.pop == "chelpg":
# for ghost in ghost_atoms:
# line = glogfile.pop(0).split()
# atom_str = line[1]
# charge = float(line[2])
# ghost['chg'] = charge
# fh.write(" {:<2s} {:>10.6f}\n".format(atom_str, charge))
# for lp in lp_atoms:
# line = glogfile.pop(0).split()
# atom_str = line[1]
# charge = float(line[2])
# lp['chg'] = charge
# fh.write(" {:<2s} {:>10.6f}\n".format(atom_str, charge))
fh.write("------------------------------------\n")
def run_gaussian(self, cycle: int, type: str, fh: TextIO) -> None:
simdir = "simfiles"
stepdir = "step{:02d}".format(cycle)
path = simdir + os.sep + stepdir + os.sep + "qm"
work_dir = os.getcwd()
os.chdir(path)
# if type == "force":
# infile = "asec.gjf"
# elif type == "charge":
# infile = "asec2.gjf"
infile = "asec.gjf"
fh.write(
"\nCalculation of {}s initiated with Gaussian on {}\n".format(
type, date_time()
)
)
if shutil.which("bash") != None:
exit_status = subprocess.call(
[
"bash",
"-c",
"exec -a {}-step{} {} {}".format(
self.qmprog, cycle, self.qmprog, infile
),
]
)
else:
exit_status = subprocess.call([self.qmprog, infile])
if exit_status != 0:
sys.exit("Gaussian process did not exit properly")
fh.write("Calculation of {}s finished on {}\n".format(type, date_time()))
os.chdir(work_dir)
# def calculate_step(
# self, cycle: int, gradient: np.ndarray, hessian: np.ndarray
# ) -> np.ndarray:
# invhessian = np.linalg.inv(hessian)
# pre_step = -1 * np.matmul(invhessian, gradient.T).T
# maxstep = np.amax(np.absolute(pre_step))
# factor = min(1, self.player.maxstep / maxstep)
# step = factor * pre_step
# self.outfile.write("\nCalculated step-{}:\n".format(cycle))
# pre_step_list = pre_step.tolist()
# self.outfile.write(
# "-----------------------------------------------------------------------\n"
# "Center Atomic Step (Bohr)\n"
# "Number Number X Y Z\n"
# "-----------------------------------------------------------------------\n"
# )
# for i in range(len(self.system.molecule[0].atom)):
# self.outfile.write(
# " {:>5d} {:>3d} {:>14.9f} {:>14.9f} {:>14.9f}\n".format(
# i + 1,
# self.system.molecule[0].atom[i].na,
# pre_step_list.pop(0),
# pre_step_list.pop(0),
# pre_step_list.pop(0),
# )
# )
# self.outfile.write(
# "-----------------------------------------------------------------------\n"
# )
# self.outfile.write("Maximum step is {:>11.6}\n".format(maxstep))
# self.outfile.write("Scaling factor = {:>6.4f}\n".format(factor))
# self.outfile.write("\nFinal step (Bohr):\n")
# step_list = step.tolist()
# self.outfile.write(
# "-----------------------------------------------------------------------\n"
# "Center Atomic Step (Bohr)\n"
# "Number Number X Y Z\n"
# "-----------------------------------------------------------------------\n"
# )
# for i in range(len(self.system.molecule[0].atom)):
# self.outfile.write(
# " {:>5d} {:>3d} {:>14.9f} {:>14.9f} {:>14.9f}\n".format(
# i + 1,
# self.system.molecule[0].atom[i].na,
# step_list.pop(0),
# step_list.pop(0),
# step_list.pop(0),
# )
# )
# self.outfile.write(
# "-----------------------------------------------------------------------\n"
# )
# step_max = np.amax(np.absolute(step))
# step_rms = np.sqrt(np.mean(np.square(step)))
# self.outfile.write(
# " Max Step = {:>14.9f} RMS Step = {:>14.9f}\n\n".format(
# step_max, step_rms
# )
# )
# return step
def configure(
self,
initcyc: int,
nprocs: int,
ncores: int,
altsteps: int,
switchcyc: int,
opt: str,
nmol: List[int],
molecule: List[Molecule],
):
self.initcyc = initcyc
self.nprocs = nprocs
self.ncores = ncores
self.altsteps = altsteps
self.switchcyc = switchcyc
self.opt = opt
self.molecule = molecule
def start(self, cycle: int, outfile: TextIO, readhessian: str) -> np.ndarray:
make_qm_dir(cycle)
if self.qmprog in ("g03", "g09", "g16"):
if cycle > 1:
src = (
"simfiles"
+ os.sep
+ "step{:02d}".format(cycle - 1)
+ os.sep
+ "qm"
+ os.sep
+ "asec.chk"
)
dst = (
"simfiles"
+ os.sep
+ "step{:02d}".format(cycle)
+ os.sep
+ "qm"
+ os.sep
+ "asec.chk"
)
shutil.copyfile(src, dst)
self.make_gaussian_input(cycle)
self.run_gaussian(cycle, "force", outfile)
self.run_formchk(cycle, outfile)
## Read the gradient
file = (
"simfiles"
+ os.sep
+ "step{:02d}".format(cycle)
+ os.sep
+ "qm"
+ os.sep
+ "asec.fchk"
)
try:
gradient
old_gradient = gradient
except:
pass
gradient = self.read_forces_fchk(file, outfile)
# If 1st step, read the hessian
if cycle == 1:
if readhessian == "yes":
file = "grad_hessian.dat"
outfile.write(
"\nReading the hessian matrix from file {}\n".format(file)
)
hessian = self.read_hessian_log(file)
else:
file = (
"simfiles"
+ os.sep
+ "step01"
+ os.sep
+ "qm"
+ os.sep
+ "asec.fchk"
)
outfile.write(
"\nReading the hessian matrix from file {}\n".format(file)
)
hessian = self.read_hessian_fchk(file)
# From 2nd step on, update the hessian
else:
outfile.write(
"\nUpdating the hessian matrix using the BFGS method... "
)
hessian = self.molecule[0].update_hessian(
step, gradient, old_gradient, hessian
)
outfile.write("Done\n")
# Save gradient and hessian
self.print_grad_hessian(cycle, gradient, hessian)
# Calculate the step and update the position
step = self.calculate_step(cycle, gradient, hessian)
position += step
## If needed, calculate the charges
if cycle < self.switchcyc:
# internal.gaussian.make_charge_input(cycle, asec_charges)
self.run_gaussian(cycle, "charge", outfile)
file = (
"simfiles"
+ os.sep
+ "step{:02d}".format(cycle)
+ os.sep
+ "qm"
+ os.sep
+ "asec2.log"
)
self.read_charges(file, outfile)
else:
file = (
"simfiles"
+ os.sep
+ "step{:02d}".format(cycle)
+ os.sep
+ "qm"
+ os.sep
+ "asec.log"
)
self.read_charges(file, outfile)
## Print new info for molecule[0]
self.outfile.write("\nNew values for molecule type 1:\n\n")
self.molecule[0].print_mol_info(outfile)
##
## Molcas block
##
# if player['qmprog'] == "molcas":
# elif player['opt'] == "ts":
##
## Gaussian block
##
# if player['qmprog'] in ("g03", "g09", "g16"):
##
## Molcas block
##
# if player['qmprog'] == "molcas":
# else: ## Only relax the charge distribution
# if internal.player.qmprog in ("g03", "g09", "g16"):
# if cycle > 1:
# src = (
# "simfiles"
# + os.sep
# + "step{:02d}".format(cycle - 1)
# + os.sep
# + "qm"
# + os.sep
# + "asec.chk"
# )
# dst = (
# "simfiles"
# + os.sep
# + "step{:02d}".format(cycle)
# + os.sep
# + "qm"
# + os.sep
# + "asec.chk"
# )
# shutil.copyfile(src, dst)
# # internal.gaussian.make_charge_input(cycle, asec_charges)
# internal.gaussian.run_gaussian(cycle, "charge", internal.outfile)
# file = (
# "simfiles"
# + os.sep
# + "step{:02d}".format(cycle)
# + os.sep
# + "qm"
# + os.sep
# + "asec2.log"
# )
# internal.read_charges(file)
# ## Print new info for molecule[0]
# internal.outfile.write("\nNew values for molecule type 1:\n\n")
# internal.system.molecule[0].print_mol_info()
# #if player['qmprog'] == "molcas"
return position
def reset(self):
del self.nprocs
del self.altsteps
del self.molecule

View File