Files
DicePlayer/diceplayer/player.py
Vitor Hideyoshi d400970e8f refactor: unify and modernize periodic table utilities and typing
- Replace atomsymb and atommass tuples with AtomInfo dataclass and PTable enum for atomic data
- Refactor all usages to access atomic symbol and mass via PTable methods
- Remove nptyping dependency, switch to numpy.typing for type annotations
- Update molecule and atom classes to use new typing and atomic data access
- Bump numpy version to 2.x and remove nptyping from dependencies
2026-02-27 17:56:11 -03:00

444 lines
15 KiB
Python

from diceplayer import VERSION, logger
from diceplayer.config.player_config import PlayerConfig
from diceplayer.environment import Atom, Molecule, System
from diceplayer.interface import DiceInterface, GaussianInterface
from diceplayer.utils import PTable, weekday_date_time
import yaml
from pydantic import BaseModel
from typing_extensions import Tuple
import os
import pickle
import sys
from pathlib import Path
ENV = ["OMP_STACKSIZE"]
class Player:
def __init__(self, infile: str = None, optimization: bool = False):
if infile is None and optimization is False:
raise ValueError("Must specify either infile or optimization")
elif infile is not None:
self.config = self.set_config(self.read_keywords(infile))
self.system = System()
self.initial_cycle = 1
elif optimization is True:
save = self.load_run_from_pickle()
self.config = save[0]
self.system = save[1]
self.initial_cycle = save[2] + 1
else:
raise ValueError("Must specify either infile or config")
self.dice_interface = DiceInterface()
self.gaussian_interface = GaussianInterface()
def start(self):
logger.info(
"==========================================================================================\n"
"Starting the iterative process.\n"
"==========================================================================================\n"
)
for cycle in range(self.initial_cycle, self.initial_cycle + self.config.maxcyc):
logger.info(
f"------------------------------------------------------------------------------------------\n"
f" Step # {cycle}\n"
f"------------------------------------------------------------------------------------------\n"
)
self.dice_start(cycle)
try:
self.gaussian_start(cycle)
except StopIteration:
break
self.save_run_in_pickle(cycle)
def prepare_system(self):
for i, mol in enumerate(self.system.molecule):
logger.info(f"Molecule {i + 1} - {mol.molname}")
mol.print_mol_info()
logger.info(
"\n Translating and rotating molecule to standard orientation..."
)
mol.standard_orientation()
logger.info("\n Done")
logger.info("\nNew values:\n")
mol.print_mol_info()
logger.info("\n")
def create_simulation_dir(self):
simulation_dir_path = Path(self.config.simulation_dir)
if simulation_dir_path.exists():
raise FileExistsError(
f"Error: a file or a directory {self.config.simulation_dir} already exists,"
f" move or delete the simfiles directory to continue."
)
simulation_dir_path.mkdir()
def create_geoms_file(self):
geoms_file_path = Path(self.config.geoms_file)
if geoms_file_path.exists():
raise FileExistsError(
f"Error: a file or a directory {self.config.geoms_file} already exists,"
f" move or delete the simfiles directory to continue."
)
geoms_file_path.touch()
def print_keywords(self) -> None:
def log_keywords(config: BaseModel):
for key, value in sorted(config.model_dump().items()):
if value is None:
continue
if isinstance(value, list):
string = " ".join(str(x) for x in value)
logger.info(f"{key} = [ {string} ]")
else:
logger.info(f"{key} = {value}")
logger.info(
f"##########################################################################################\n"
f"############# Welcome to DICEPLAYER version {VERSION} #############\n"
f"##########################################################################################\n"
)
logger.info("Your python version is {}\n".format(sys.version))
logger.info("Program started on {}\n".format(weekday_date_time()))
logger.info("Environment variables:")
for var in ENV:
logger.info(
"{} = {}\n".format(
var, (os.environ[var] if var in os.environ else "Not set")
)
)
logger.info(
"------------------------------------------------------------------------------------------\n"
" DICE variables being used in this run:\n"
"------------------------------------------------------------------------------------------\n"
)
log_keywords(self.config.dice)
logger.info(
"------------------------------------------------------------------------------------------\n"
" GAUSSIAN variables being used in this run:\n"
"------------------------------------------------------------------------------------------\n"
)
log_keywords(self.config.gaussian)
logger.info("\n")
def read_potentials(self):
ljname_path = Path(self.config.dice.ljname)
if ljname_path.exists():
with open(self.config.dice.ljname) as file:
ljc_data = file.readlines()
else:
raise RuntimeError(f"Potential file {self.config.dice.ljname} not found.")
combrule = ljc_data.pop(0).split()[0]
if combrule not in ("*", "+"):
sys.exit(
"Error: expected a '*' or a '+' sign in 1st line of file {}".format(
self.config.dice.ljname
)
)
self.config.dice.combrule = combrule
ntypes = ljc_data.pop(0).split()[0]
if not ntypes.isdigit():
sys.exit(
"Error: expected an integer in the 2nd line of file {}".format(
self.config.dice.ljname
)
)
ntypes = int(ntypes)
if ntypes != len(self.config.dice.nmol):
sys.exit(
f"Error: number of molecule types in file {self.config.dice.ljname} "
f"must match that of 'nmol' keyword in config file"
)
for i in range(ntypes):
try:
nsites, molname = ljc_data.pop(0).split()[:2]
except ValueError:
raise ValueError(
f"Error: expected nsites and molname for the molecule type {i + 1}"
)
if not nsites.isdigit():
raise ValueError(
f"Error: expected nsites to be an integer for molecule type {i + 1}"
)
nsites = int(nsites)
self.system.add_type(Molecule(molname))
atom_fields = ["lbl", "na", "rx", "ry", "rz", "chg", "eps", "sig"]
for j in range(nsites):
new_atom = dict(zip(atom_fields, ljc_data.pop(0).split()))
self.system.molecule[i].add_atom(
Atom(**self.validate_atom_dict(i, j, new_atom))
)
def print_potentials(self) -> None:
formatstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f} {:>9.4f}"
logger.info(
"==========================================================================================\n"
f" Potential parameters from file {self.config.dice.ljname}:\n"
"------------------------------------------------------------------------------------------"
"\n"
)
logger.info(f"Combination rule: {self.config.dice.combrule}")
logger.info(f"Types of molecules: {len(self.system.molecule)}\n")
i = 0
for mol in self.system.molecule:
i += 1
logger.info("{} atoms in molecule type {}:".format(len(mol.atom), i))
logger.info(
"---------------------------------------------------------------------------------"
)
logger.info(
"Lbl AN X Y Z Charge Epsilon Sigma Mass"
)
logger.info(
"---------------------------------------------------------------------------------"
)
for atom in mol.atom:
logger.info(
formatstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
atom.mass,
)
)
logger.info("\n")
def dice_start(self, cycle: int):
self.dice_interface.configure(
self.config,
self.system,
)
self.dice_interface.start(cycle)
self.dice_interface.reset()
def gaussian_start(self, cycle: int):
self.gaussian_interface.configure(
self.config,
self.system,
)
result = self.gaussian_interface.start(cycle)
self.gaussian_interface.reset()
if self.config.opt:
if "position" not in result:
raise RuntimeError("Optimization failed. No position found in result.")
self.system.update_molecule(result["position"])
else:
if "charges" not in result:
raise RuntimeError(
"Charges optimization failed. No charges found in result."
)
diff = self.system.molecule[0].update_charges(result["charges"])
self.system.print_charges_and_dipole(cycle)
self.print_geoms(cycle)
if diff < self.config.gaussian.chg_tol:
logger.info(f"Charges converged after {cycle} cycles.")
raise StopIteration()
def print_geoms(self, cycle: int):
with open(self.config.geoms_file, "a") as file:
file.write(f"Cycle # {cycle}\n")
for atom in self.system.molecule[0].atom:
symbol = PTable.get_atomic_symbol(atom.na)
file.write(
f"{symbol:<2s} {atom.rx:>10.6f} {atom.ry:>10.6f} {atom.rz:>10.6f}\n"
)
file.write("\n")
@staticmethod
def validate_atom_dict(molecule_type, molecule_site, atom_dict: dict) -> dict:
molecule_type += 1
molecule_site += 1
if len(atom_dict) < 8:
raise ValueError(
f"Invalid number of fields for site {molecule_site} for molecule type {molecule_type}."
)
try:
atom_dict["lbl"] = int(atom_dict["lbl"])
except Exception:
raise ValueError(
f"Invalid lbl fields for site {molecule_site} for molecule type {molecule_type}."
)
try:
atom_dict["na"] = int(atom_dict["na"])
except Exception:
raise ValueError(
f"Invalid na fields for site {molecule_site} for molecule type {molecule_type}."
)
try:
atom_dict["rx"] = float(atom_dict["rx"])
except Exception:
raise ValueError(
f"Invalid rx fields for site {molecule_site} for molecule type {molecule_type}. "
f"Value must be a float."
)
try:
atom_dict["ry"] = float(atom_dict["ry"])
except Exception:
raise ValueError(
f"Invalid ry fields for site {molecule_site} for molecule type {molecule_type}. "
f"Value must be a float."
)
try:
atom_dict["rz"] = float(atom_dict["rz"])
except Exception:
raise ValueError(
f"Invalid rz fields for site {molecule_site} for molecule type {molecule_type}. "
f"Value must be a float."
)
try:
atom_dict["chg"] = float(atom_dict["chg"])
except Exception:
raise ValueError(
f"Invalid chg fields for site {molecule_site} for molecule type {molecule_type}. "
f"Value must be a float."
)
try:
atom_dict["eps"] = float(atom_dict["eps"])
except Exception:
raise ValueError(
f"Invalid eps fields for site {molecule_site} for molecule type {molecule_type}. "
f"Value must be a float."
)
try:
atom_dict["sig"] = float(atom_dict["sig"])
except Exception:
raise ValueError(
f"Invalid sig fields for site {molecule_site} for molecule type {molecule_type}. "
f"Value must be a float."
)
return atom_dict
def print_results(self):
formatstr = "{:<3d} {:>3d} {:>10.5f} {:>10.5f} {:>10.5f} {:>10.6f} {:>9.5f} {:>7.4f} {:>9.4f}"
mol = self.system.molecule[0]
logger.info("{} atoms in molecule type {}:".format(len(mol.atom), 1))
logger.info(
"---------------------------------------------------------------------------------"
)
logger.info(
"Lbl AN X Y Z Charge Epsilon Sigma Mass"
)
logger.info(
"---------------------------------------------------------------------------------"
)
for atom in mol.atom:
logger.info(
formatstr.format(
atom.lbl,
atom.na,
atom.rx,
atom.ry,
atom.rz,
atom.chg,
atom.eps,
atom.sig,
atom.mass,
)
)
logger.info("\n")
def save_run_in_pickle(self, cycle):
try:
with open("latest-step.pkl", "wb") as pickle_file:
pickle.dump((self.config, self.system, cycle), pickle_file)
except Exception:
raise RuntimeError("Could not save pickle file latest-step.pkl.")
@staticmethod
def load_run_from_pickle() -> Tuple[PlayerConfig, System, int]:
pickle_path = Path("latest-step.pkl")
try:
with open(pickle_path, "rb") as pickle_file:
save = pickle.load(pickle_file)
return save[0], save[1], save[2] + 1
except Exception:
raise RuntimeError(f"Could not load pickle file {pickle_path}.")
@staticmethod
def set_config(data: dict) -> PlayerConfig:
return PlayerConfig.model_validate(data)
@staticmethod
def read_keywords(infile) -> dict:
with open(infile, "r") as yml_file:
config = yaml.load(yml_file, Loader=yaml.SafeLoader)
if "diceplayer" in config:
return config.get("diceplayer")
raise RuntimeError(f"Could not find diceplayer section in {infile}.")
@classmethod
def from_file(cls, infile: str) -> "Player":
return cls(infile=infile)
@classmethod
def from_save(cls):
return cls(optimization=True)