Implementation of Logger and Finishes DiceInterface Tests

This commit is contained in:
2023-04-27 03:48:23 -03:00
parent 420b36b872
commit f1deff4786
8 changed files with 517 additions and 26 deletions

9
.gitignore vendored
View File

@@ -4,3 +4,12 @@ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
.vscode/settings.json .vscode/settings.json
*.ljc
*.log
*.log.backup
simfiles/*
.vscode/*
.idea/*

View File

@@ -167,7 +167,7 @@ class DiceInterface(Interface):
xyz_lines = last_xyz_file.readlines() xyz_lines = last_xyz_file.readlines()
nsites_mm = 0 nsites_mm = 0
for i in range(len(self.step.nmol)): for i in range(1, len(self.step.nmol)):
nsites_mm += self.step.nmol[i] * len(self.step.molecule[i].atom) nsites_mm += self.step.nmol[i] * len(self.step.molecule[i].atom)
xyz_lines = xyz_lines[-nsites_mm:] xyz_lines = xyz_lines[-nsites_mm:]
@@ -379,9 +379,9 @@ class DiceInterface(Interface):
) )
if exit_status != 0: if exit_status != 0:
raise Exception(f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly") raise RuntimeError(f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly")
with open(Path(file_name + ".out"), 'r') as outfile: with open(Path(file_name + ".out"), 'r') as outfile:
flag = outfile.readlines()[DICE_FLAG_LINE].strip() flag = outfile.readlines()[DICE_FLAG_LINE].strip()
if flag != DICE_END_FLAG: if flag != DICE_END_FLAG:
raise Exception(f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly") raise RuntimeError(f"Dice process step{cycle:02d}-p{proc:02d} did not exit properly")

View File

@@ -0,0 +1,39 @@
import logging
class Logger:
outfile = None
_logger = None
_instance = None
def __new__(cls, *args, **kwargs):
if not getattr(cls, '_instance'):
cls._instance = super(Logger, cls).__new__(cls)
return cls._instance
def set_logger(self, logger_name, outfile='run.log', level=logging.INFO):
self.outfile = outfile
self._logger = logging.getLogger(logger_name)
if level is not None:
self._logger.setLevel(level)
self._create_handlers()
def _create_handlers(self):
handlers = []
if self.outfile is not None:
handlers.append(logging.FileHandler(self.outfile, mode='a+'))
else:
handlers.append(logging.StreamHandler())
for handler in handlers:
handler.setFormatter(logging.Formatter('%(message)s'))
self._logger.addHandler(handler)
def close(self):
for handler in self._logger.handlers:
handler.close()
self._logger.removeHandler(handler)

View File

@@ -1,15 +0,0 @@
def NotNull(requiredArgs=[]):
def _NotNull(function):
def wrapper(*args, **kwargs):
for arg in requiredArgs:
try:
assert (
kwargs.get(arg) is not None
), "Invalid Config File. Keyword {} is required".format(arg)
except AssertionError as err:
print(err)
return function(*args, **kwargs)
return wrapper
return _NotNull

View File

@@ -1,3 +1,5 @@
import numpy as np
from diceplayer.shared.environment.molecule import Molecule from diceplayer.shared.environment.molecule import Molecule
from diceplayer.shared.environment.atom import Atom from diceplayer.shared.environment.atom import Atom
@@ -145,3 +147,66 @@ class TestMolecule(unittest.TestCase):
expected_charges, expected_charges,
actual_charges actual_charges
) )
def test_sizes_of_molecule(self):
mol = Molecule('test')
mol.add_atom(
Atom(lbl=1, na=1, rx=0.0, ry=0.0, rz=0.0, chg=1.0, eps=1.0, sig=1.0)
)
sizes = mol.sizes_of_molecule()
expected_sizes = [0.0, 0.0, 0.0]
npt.assert_equal(sizes, expected_sizes)
def test_standard_orientation(self):
mol = Molecule('test')
mol.add_atom(
Atom(lbl=1, na=1, rx=1.0, ry=1.0, rz=1.0, chg=1.0, eps=1.0, sig=1.0)
)
mol.standard_orientation()
expected_position = [0.0, 0.0, 0.0]
self.assertEqual(mol.read_position().tolist(), expected_position)
def test_translate(self):
mol = Molecule('test')
mol.add_atom(
Atom(lbl=1, na=1, rx=1.0, ry=1.0, rz=1.0, chg=1.0, eps=1.0, sig=1.0)
)
new_mol = mol.translate(np.array([-1, -1, -1]))
expected_position = [0.0, 0.0, 0.0]
self.assertEqual(
new_mol.read_position().tolist(),
expected_position
)
def test_minimum_distance(self):
mol1 = Molecule('test1')
mol1.add_atom(
Atom(lbl=1, na=1, rx=0.0, ry=0.0, rz=0.0, chg=1.0, eps=1.0, sig=1.0)
)
mol2 = Molecule('test2')
mol2.add_atom(
Atom(lbl=1, na=1, rx=1.0, ry=0.0, rz=0.0, chg=1.0, eps=1.0, sig=1.0)
)
expected_distance = 1.0
actual_distance = mol1.minimum_distance(mol2)
self.assertEqual(expected_distance, actual_distance)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,5 +1,6 @@
from diceplayer.shared.interface.dice_interface import DiceInterface from diceplayer.shared.interface.dice_interface import DiceInterface
from diceplayer.shared.environment.molecule import Molecule from diceplayer.shared.environment.molecule import Molecule
from diceplayer.shared.environment.atom import Atom
from diceplayer.shared.config.step_dto import StepDTO from diceplayer.shared.config.step_dto import StepDTO
import io import io
@@ -563,7 +564,25 @@ class TestDiceInterface(unittest.TestCase):
with self.assertRaises(FileNotFoundError): with self.assertRaises(FileNotFoundError):
dice._run_dice(1, 1) dice._run_dice(1, 1)
def test_make_init_file(self): @mock.patch('builtins.open', new_callable=mock.mock_open)
def test_make_init_file(self, mock_open):
example_atom = Atom(
lbl=1,
na=1,
rx=1.0,
ry=1.0,
rz=1.0,
chg=1.0,
eps=1.0,
sig=1.0,
)
main_molecule = Molecule('main_molecule')
main_molecule.add_atom(example_atom)
secondary_molecule = Molecule('secondary_molecule')
secondary_molecule.add_atom(example_atom)
dice = DiceInterface( dice = DiceInterface(
{ {
'ljname': 'test', 'ljname': 'test',
@@ -581,19 +600,359 @@ class TestDiceInterface(unittest.TestCase):
simulation_dir='test', simulation_dir='test',
altsteps=1, altsteps=1,
molecule=[ molecule=[
main_molecule,
secondary_molecule,
],
nmol=[
len(main_molecule.atom),
len(secondary_molecule.atom),
], ],
nmol=[],
) )
) )
last_xyz_file = io.StringIO() last_xyz_file = io.StringIO()
last_xyz_file.writelines([ last_xyz_file.writelines([
'TEST', ' TEST\n',
'TEST', ' Configuration number : TEST = TEST TEST TEST\n',
'TEST', ' H 1.00000 1.00000 1.00000\n',
'TEST', ' H 1.00000 1.00000 1.00000\n',
]) ])
last_xyz_file.seek(0)
dice._make_init_file('test', last_xyz_file)
mock_handler = mock_open()
calls = mock_handler.write.call_args_list
lines = list(map(lambda x: x[0][0], calls))
expected_lines = [
' 1.000000 1.000000 1.000000\n',
' 1.000000 1.000000 1.000000\n',
'$end'
]
self.assertEqual(lines, expected_lines)
@mock.patch('builtins.open', new_callable=mock.mock_open)
def test_new_density(self, mock_open):
example_atom = Atom(
lbl=1,
na=1,
rx=1.0,
ry=1.0,
rz=1.0,
chg=1.0,
eps=1.0,
sig=1.0,
)
main_molecule = Molecule('main_molecule')
main_molecule.add_atom(example_atom)
secondary_molecule = Molecule('secondary_molecule')
secondary_molecule.add_atom(example_atom)
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
dice.configure(
StepDTO(
ncores=1,
nprocs=1,
simulation_dir='test',
altsteps=1,
molecule=[
main_molecule,
secondary_molecule,
],
nmol=[
len(main_molecule.atom),
len(secondary_molecule.atom),
],
)
)
last_xyz_file = io.StringIO()
last_xyz_file.writelines([
' TEST\n',
' Configuration number : TEST = 1 1 1\n',
' H 1.00000 1.00000 1.00000\n',
' H 1.00000 1.00000 1.00000\n',
])
last_xyz_file.seek(0)
density = dice._new_density(last_xyz_file)
self.assertEqual(density, 3.3472359000000003)
@mock.patch('builtins.open', new_callable=mock.mock_open)
@mock.patch('diceplayer.shared.interface.dice_interface.random')
def test_make_nvt_ter(self, mock_random, mock_open):
mock_random.random.return_value = 1
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
dice.configure(
StepDTO(
ncores=1,
nprocs=1,
simulation_dir='test',
altsteps=1,
molecule=[],
nmol=[],
)
)
dice._make_nvt_ter(1, 'test')
mock_handler = mock_open()
calls = mock_handler.write.call_args_list
lines = list(map(lambda x: x[0][0], calls))
expected_lines = ['title = Diceplayer run - NVT Thermalization\n', 'ncores = 1\n', 'ljname = test\n', 'outname = test\n', 'nmol = 1\n', 'dens = 1.0\n', 'temp = 300.0\n', 'init = yes\n', 'nstep = 1\n', 'vstep = 0\n', 'mstop = 1\n', 'accum = no\n', 'iprint = 1\n', 'isave = 0\n', 'irdf = 0\n', 'seed = 1000000\n', 'upbuf = 360']
self.assertEqual(lines, expected_lines)
@mock.patch('builtins.open', new_callable=mock.mock_open)
@mock.patch('diceplayer.shared.interface.dice_interface.random')
def test_make_nvt_eq(self, mock_random, mock_open):
mock_random.random.return_value = 1
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
dice.configure(
StepDTO(
ncores=1,
nprocs=1,
simulation_dir='test',
altsteps=1,
molecule=[],
nmol=[],
)
)
dice._make_nvt_eq('test')
mock_handler = mock_open()
calls = mock_handler.write.call_args_list
lines = list(map(lambda x: x[0][0], calls))
expected_lines = ['title = Diceplayer run - NVT Production\n', 'ncores = 1\n', 'ljname = test\n', 'outname = test\n', 'nmol = 1\n', 'dens = 1.0\n', 'temp = 300.0\n', 'init = no\n', 'nstep = 1\n', 'vstep = 0\n', 'mstop = 1\n', 'accum = no\n', 'iprint = 1\n', 'isave = 1000\n', 'irdf = 10\n', 'seed = 1000000\n']
self.assertEqual(lines, expected_lines)
@mock.patch('builtins.open', new_callable=mock.mock_open)
@mock.patch('diceplayer.shared.interface.dice_interface.random')
def test_make_npt_ter(self, mock_random, mock_open):
mock_random.random.return_value = 1
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1, 1],
}
)
dice.configure(
StepDTO(
ncores=1,
nprocs=1,
simulation_dir='test',
altsteps=1,
molecule=[],
nmol=[],
)
)
dice._make_npt_ter(1, 'test')
mock_handler = mock_open()
calls = mock_handler.write.call_args_list
lines = list(map(lambda x: x[0][0], calls))
expected_lines = ['title = Diceplayer run - NPT Thermalization\n', 'ncores = 1\n', 'ljname = test\n', 'outname = test\n', 'nmol = 1\n', 'press = 1.0\n', 'temp = 300.0\n', 'init = no\n', 'vstep = 0\n', 'nstep = 5\n', 'mstop = 1\n', 'accum = no\n', 'iprint = 1\n', 'isave = 0\n', 'irdf = 0\n', 'seed = 1000000\n']
self.assertEqual(lines, expected_lines)
@mock.patch('builtins.open', new_callable=mock.mock_open)
@mock.patch('diceplayer.shared.interface.dice_interface.random')
def test_make_npt_eq(self, mock_random, mock_open):
mock_random.random.return_value = 1
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1, 1],
}
)
dice.configure(
StepDTO(
ncores=1,
nprocs=1,
simulation_dir='test',
altsteps=1,
molecule=[],
nmol=[],
)
)
dice._make_npt_eq('test')
mock_handler = mock_open()
calls = mock_handler.write.call_args_list
lines = list(map(lambda x: x[0][0], calls))
expected_lines = ['title = Diceplayer run - NPT Production\n', 'ncores = 1\n', 'ljname = test\n', 'outname = test\n', 'nmol = 1\n', 'press = 1.0\n', 'temp = 300.0\n', 'nstep = 5\n', 'vstep = 0\n', 'init = no\n', 'mstop = 1\n', 'accum = no\n', 'iprint = 1\n', 'isave = 1000\n', 'irdf = 10\n', 'seed = 1000000\n']
self.assertEqual(lines, expected_lines)
@mock.patch('builtins.open', new_callable=mock.mock_open)
def test_make_potentials(self, mock_open):
example_atom = Atom(
lbl=1,
na=1,
rx=1.0,
ry=1.0,
rz=1.0,
chg=1.0,
eps=1.0,
sig=1.0,
)
main_molecule = Molecule('main_molecule')
main_molecule.add_atom(example_atom)
secondary_molecule = Molecule('secondary_molecule')
secondary_molecule.add_atom(example_atom)
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
dice.configure(
StepDTO(
ncores=1,
nprocs=1,
simulation_dir='test',
altsteps=1,
molecule=[
main_molecule,
secondary_molecule,
],
nmol=[
len(main_molecule.atom),
len(secondary_molecule.atom),
],
)
)
dice._make_potentials('test')
mock_handler = mock_open()
calls = mock_handler.write.call_args_list
lines = list(map(lambda x: x[0][0], calls))
expected_lines = ['*\n', '2\n', '1 main_molecule\n', '1 1 1.00000 1.00000 1.00000 1.000000 1.00000 1.0000\n', '1 secondary_molecule\n', '1 1 1.00000 1.00000 1.00000 1.000000 1.00000 1.0000\n']
self.assertEqual(lines, expected_lines)
@mock.patch('diceplayer.shared.interface.dice_interface.subprocess')
@mock.patch('builtins.open', new_callable=mock.mock_open, read_data='End of simulation\nBLABLA')
def test_run_dice_file(self, mock_open, mock_subprocess):
mock_subprocess.call.return_value = 0
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
dice.run_dice_file(1, 1, 'test')
self.assertTrue(mock_subprocess.call.called)
self.assertTrue(mock_open.called)
@mock.patch('diceplayer.shared.interface.dice_interface.subprocess')
@mock.patch('builtins.open', new_callable=mock.mock_open, read_data='Error\nBLABLA')
def test_run_dice_file_raises_runtime_error_on_dice_file(self, mock_open, mock_subprocess):
mock_subprocess.call.return_value = 0
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
with self.assertRaises(RuntimeError):
dice.run_dice_file(1, 1, 'test')
@mock.patch('diceplayer.shared.interface.dice_interface.subprocess')
@mock.patch('builtins.open', new_callable=mock.mock_open, read_data='End of simulation\nBLABLA')
def test_run_dice_file_raises_runtime_error_of_dice_exit_code(self, mock_open, mock_subprocess):
mock_subprocess.call.return_value = 1
dice = DiceInterface(
{
'ljname': 'test',
'outname': 'test',
'ncores': 1,
'dens': 1.0,
'nmol': [1],
'nstep': [1, 1],
}
)
with self.assertRaises(RuntimeError):
dice.run_dice_file(1, 1, 'test')
if __name__ == '__main__': if __name__ == '__main__':

View File

View File

@@ -0,0 +1,34 @@
from diceplayer.shared.utils.logger import Logger
import unittest
class TestLogger(unittest.TestCase):
def test_class_instantiation(self):
logger = Logger()
self.assertIsInstance(logger, Logger)
def test_singleton(self):
logger1 = Logger()
logger2 = Logger()
self.assertIs(logger1, logger2)
def test_set_logger(self):
logger = Logger()
logger.set_logger('test_logger')
self.assertIsNotNone(logger._logger)
self.assertEqual(logger._logger.name, 'test_logger')
def test_close(self):
logger = Logger()
logger.set_logger('test_logger')
logger.close()
self.assertEqual(len(logger._logger.handlers), 0)
if __name__ == '__main__':
unittest.main()