from diceplayer.shared.config.step_dto import StepDTO from diceplayer.shared.environment.atom import Atom from diceplayer.shared.utils.dataclass_protocol import Dataclass from diceplayer.shared.config.gaussian_dto import GaussianDTO from diceplayer.shared.environment.molecule import Molecule from diceplayer.shared.environment.system import System from diceplayer.shared.utils.misc import weekday_date_time from diceplayer.shared.config.player_dto import PlayerDTO from diceplayer.shared.interface.gaussian_interface import GaussianInterface from diceplayer.shared.config.dice_dto import DiceDTO from diceplayer.shared.interface.dice_interface import DiceInterface from dataclasses import fields from pathlib import Path from typing import Type import logging import yaml import sys import os from diceplayer.shared.utils.ptable import atommass ENV = ["OMP_STACKSIZE"] class Player: __slots__ = [ 'config', 'system', 'dice', 'gaussian', ] def __init__(self, infile: str): config_data = self.read_keywords(infile) self.system = System() self.config = self.set_config( config_data.get("diceplayer") ) self.gaussian = GaussianInterface(config_data.get("gaussian")) self.dice = DiceInterface(config_data.get("dice")) def start(self): self.print_keywords() self.create_simulation_dir() self.read_potentials() # self.print_potentials() self.dice_start(1) self.dice_start(2) def create_simulation_dir(self): simulation_dir_path = Path(self.config.simulation_dir) if simulation_dir_path.exists(): raise FileExistsError( f"Error: a file or a directory {self.config.simulation_dir} already exists," f" move or delete the simfiles directory to continue." ) try: simulation_dir_path.mkdir() except FileExistsError: OSError( f"Error: cannot make directory {self.config.simulation_dir}" ) def print_keywords(self) -> None: def log_keywords(config: Dataclass, dto: Type[Dataclass]): for key in sorted(list(map(lambda f: f.name, fields(dto)))): if getattr(config, key) is not None: if isinstance(getattr(config, key), list): string = " ".join(str(x) for x in getattr(config, key)) logging.info(f"{key} = [ {string} ]") else: logging.info(f"{key} = {getattr(config, key)}") logging.info( "##########################################################################################\n" "############# Welcome to DICEPLAYER version 1.0 #############\n" "##########################################################################################\n" "\n" ) logging.info("Your python version is {}\n".format(sys.version)) logging.info("\n") logging.info("Program started on {}\n".format(weekday_date_time())) logging.info("\n") logging.info("Environment variables:\n") for var in ENV: logging.info( "{} = {}\n".format( var, (os.environ[var] if var in os.environ else "Not set") ) ) logging.info( "\n==========================================================================================\n" " CONTROL variables being used in this run:\n" "------------------------------------------------------------------------------------------\n" "\n" ) logging.info("\n") logging.info( "------------------------------------------------------------------------------------------\n" " DICE variables being used in this run:\n" "------------------------------------------------------------------------------------------\n" "\n" ) log_keywords(self.dice.config, DiceDTO) logging.info("\n") logging.info( "------------------------------------------------------------------------------------------\n" " GAUSSIAN variables being used in this run:\n" "------------------------------------------------------------------------------------------\n" "\n" ) log_keywords(self.gaussian.config, GaussianDTO) logging.info("\n") def read_potentials(self): try: with open(self.dice.config.ljname) as file: ljdata = file.readlines() except FileNotFoundError: raise RuntimeError( f"Potential file {self.dice.config.ljname} not found." ) combrule = ljdata.pop(0).split()[0] if combrule not in ("*", "+"): sys.exit( "Error: expected a '*' or a '+' sign in 1st line of file {}".format( self.dice.config.ljname ) ) self.dice.config.combrule = combrule ntypes = ljdata.pop(0).split()[0] if not ntypes.isdigit(): sys.exit( "Error: expected an integer in the 2nd line of file {}".format( self.dice.config.ljname ) ) ntypes = int(ntypes) if ntypes != len(self.dice.config.nmol): sys.exit( f"Error: number of molecule types in file {self.dice.config.ljname}" f"must match that of 'nmol' keyword in config file" ) for i in range(ntypes): nsites, molname = ljdata.pop(0).split()[:2] if not nsites.isdigit(): raise ValueError( f"Error: expected nsites to be an integer for molecule type {i}" ) if molname is None: raise ValueError( f"Error: expected molecule name for molecule type {i}" ) nsites = int(nsites) self.system.add_type(nsites, Molecule(molname)) atom_fields = ["lbl", "na", "rx", "ry", "rz", "chg", "eps", "sig"] for j in range(nsites): new_atom = dict(zip( atom_fields, ljdata.pop(0).split() )) self.system.molecule[i].add_atom( Atom(**self.validate_atom_dict(i, j, new_atom)) ) def dice_start(self, cycle: int): self.dice.configure( StepDTO( ncores=self.config.ncores, nprocs=self.config.nprocs, simulation_dir=self.config.simulation_dir, altsteps=self.config.altsteps, molecule=self.system.molecule, nmol=self.system.nmols, ) ) self.dice.start(cycle) self.dice.reset() def gaussian_start(self): self.gaussian.start() @staticmethod def validate_atom_dict(molecule_type, molecule_site, atom_dict: dict) -> dict: molecule_type += 1 molecule_site += 1 if len(atom_dict) < 8: raise ValueError( f'Invalid number of fields for site {molecule_site} for molecule type {molecule_type}.' ) try: atom_dict['lbl'] = int(atom_dict['lbl']) except ValueError: raise ValueError( f'Invalid lbl fields for site {molecule_site} for molecule type {molecule_type}.' ) try: atom_dict['na'] = int(atom_dict['na']) except ValueError: raise ValueError( f'Invalid na fields for site {molecule_site} for molecule type {molecule_type}.' ) try: atom_dict['rx'] = float(atom_dict['rx']) except ValueError: raise ValueError( f'Invalid rx fields for site {molecule_site} for molecule type {molecule_type}.' f'Value must be a float.' ) try: atom_dict['ry'] = float(atom_dict['ry']) except ValueError: raise ValueError( f'Invalid ry fields for site {molecule_site} for molecule type {molecule_type}.' f'Value must be a float.' ) try: atom_dict['rz'] = float(atom_dict['rx']) except ValueError: raise ValueError( f'Invalid rz fields for site {molecule_site} for molecule type {molecule_type}.' f'Value must be a float.' ) try: atom_dict['chg'] = float(atom_dict['chg']) except ValueError: raise ValueError( f'Invalid chg fields for site {molecule_site} for molecule type {molecule_type}.' f'Value must be a float.' ) try: atom_dict['eps'] = float(atom_dict['eps']) except ValueError: raise ValueError( f'Invalid eps fields for site {molecule_site} for molecule type {molecule_type}.' f'Value must be a float.' ) try: atom_dict['sig'] = float(atom_dict['sig']) except ValueError: raise ValueError( f'Invalid sig fields for site {molecule_site} for molecule type {molecule_type}.' f'Value must be a float.' ) return atom_dict @staticmethod def set_config(data: dict) -> PlayerDTO: return PlayerDTO.from_dict(data) @staticmethod def read_keywords(infile) -> dict: with open(infile, 'r') as yml_file: return yaml.load(yml_file, Loader=yaml.SafeLoader)