Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 140 additions & 39 deletions src/scm/plams/interfaces/adfsuite/amsworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
from scm.amspipe import AMSPipeError
import psutil

try:
from scm.base import ChemicalSystem

_has_scm_chemsys = True
except ImportError:
_has_scm_chemsys = False

T = TypeVar("T")
TSelf = TypeVar("TSelf", bound="AMSWorker")
TPool = TypeVar("TPool", bound="AMSWorkerPool")
Expand Down Expand Up @@ -141,15 +148,22 @@ class AMSWorkerResults:
def __init__(
self,
name: str,
molecule: "Molecule",
molecule: Union["Molecule", "ChemicalSystem"],
results: Dict[str, Any],
error: Optional[Union["AMSPipeError", "AMSWorkerError"]] = None,
):
self._input_molecule: Optional["Molecule"] = None
self._input_system: Optional["ChemicalSystem"] = None
if _has_scm_chemsys and isinstance(molecule, ChemicalSystem):
self._input_system = molecule
else:
self._input_molecule = molecule

self._name = name
self._input_molecule = molecule
self.error = error
self._results = results
self._main_molecule: Optional["Molecule"] = None
self._main_system: Optional["ChemicalSystem"] = None
self._main_ase_atoms: Optional["ASEAtoms"] = None

@property
Expand All @@ -172,7 +186,7 @@ def ok(self) -> bool:
return self.error is None

def get_errormsg(self) -> Optional[str]:
"""Attempts to retreive a human readable error message from a crashed job. Returns ``None`` for jobs without errors."""
"""Attempts to retrieve a human readable error message from a crashed job. Returns ``None`` for jobs without errors."""
if self.ok():
return None
else:
Expand Down Expand Up @@ -261,23 +275,54 @@ def get_input_molecule(self) -> "Molecule":

Note that this method may also be used if the calculation producing this |AMSWorkerResults| object has failed, i.e. :meth:`ok` is ``False``.
"""
return self._input_molecule
if self._input_molecule is None:
system_block = str(self._input_system)
ams_job = AMSJob.from_input(text_input=system_block)
self._input_molecule = list(ams_job.molecule.values())[0] # type: ignore[operator,union-attr]

return self._input_molecule.copy()

@requires_optional_package("scm.base")
def get_input_system(self) -> "ChemicalSystem":
"""Return a ``ChemicalSystem`` instance with the coordinates passed into the |AMSWorker|.

Note that this method may also be used if the calculation producing this |AMSWorkerResults| object has failed, i.e. :meth:`ok` is ``False``.
"""
if self._input_system is None:
ams_job = AMSJob(molecule=self.get_input_molecule())
system_block = ams_job.get_input()
self._input_system = ChemicalSystem(system_block)

return self._input_system.copy()

@_restrict
def get_main_molecule(self) -> "Molecule":
"""Return a |Molecule| instance with the final coordinates."""
if self._main_molecule is None:
self._main_molecule = self.get_input_molecule()
if self._results is not None and "xyzAtoms" in self._results:
self._main_molecule = self._input_molecule.copy()
self._main_molecule.from_array(self._results.get("xyzAtoms") * Units.conversion_ratio("au", "Angstrom")) # type: ignore[operator]
if "latticeVectors" in self._results:
self._main_molecule.lattice = [
list(v) for v in self._results.get("latticeVectors") * Units.conversion_ratio("au", "Angstrom") # type: ignore[operator,union-attr]
]
else:
self._main_molecule = self._input_molecule

return self._main_molecule
return self._main_molecule.copy()

@_restrict
@requires_optional_package("scm.base")
def get_main_system(self) -> "ChemicalSystem":
"""Return a ``ChemicalSystem`` instance with the final coordinates."""
if self._main_system is None:
self._main_system = self.get_input_system()
if self._results is not None and "xyzAtoms" in self._results:
self._main_system.coords = self._results.get("xyzAtoms") * Units.conversion_ratio("au", "Angstrom") # type: ignore[operator]
if "latticeVectors" in self._results and self._results.get("latticeVectors").size > 0: # type: ignore[union-attr]
self._main_system.lattice.vectors = [
list(v) for v in self._results.get("latticeVectors") * Units.conversion_ratio("au", "Angstrom") # type: ignore[operator,union-attr]
]

return self._main_system.copy()

@_restrict
@requires_optional_package("ase")
Expand All @@ -299,7 +344,7 @@ def get_main_ase_atoms(self) -> "ASEAtoms":
cell[: lattice.shape[0], : lattice.shape[1]] = lattice * Units.conversion_ratio(
"au", "Angstrom"
)
atomsymbols = [at.symbol for at in self._input_molecule]
atomsymbols = [at.symbol for at in self.get_input_molecule()]
positions = np.array(self._results["xyzAtoms"]).reshape(-1, 3) * Units.conversion_ratio(
"au", "Angstrom"
)
Expand Down Expand Up @@ -857,7 +902,9 @@ def _args_to_settings(**kwargs: Any) -> Settings:
s.set_nested(_arg2setting[key], val)
return s

def _solve_from_settings(self, name: str, molecule: "Molecule", settings: Settings) -> AMSWorkerResults:
def _solve_from_settings(
self, name: str, molecule: Union["Molecule", "ChemicalSystem"], settings: Settings
) -> AMSWorkerResults:
args = AMSWorker._settings_to_args(settings)
if args["task"].lower() == "geometryoptimization":
args["gradients"] = True # need to explicitly set gradients to True to get them in the AMSWorkerResults
Expand All @@ -868,7 +915,7 @@ def _solve_from_settings(self, name: str, molecule: "Molecule", settings: Settin
def _solve(
self,
name: str,
molecule: "Molecule",
molecule: Union["Molecule", "ChemicalSystem"],
task: str,
prev_results: Optional[Union[AMSWorkerResults, AMSWorkerMDState]] = None,
quiet: bool = True,
Expand Down Expand Up @@ -983,34 +1030,64 @@ def _solve(
# ... and return an AMSWorkerResults object indicating our failure.
return AMSWorkerResults(name, molecule, {}, exc)

def _prepare_system(self, molecule: "Molecule") -> None:
def _prepare_system(self, molecule: Union["Molecule", "ChemicalSystem"]) -> None:
# This is a good opportunity to let the worker process know about all the results we no longer need ...
self._prune_restart_cache()

chemicalSystem: Dict[str, Any] = {}
chemicalSystem["atomSymbols"] = np.asarray([atom.symbol for atom in molecule])
chemicalSystem["coords"] = molecule.as_array() * Units.conversion_ratio("Angstrom", "Bohr")
if "charge" in molecule.properties:
chemicalSystem["totalCharge"] = float(molecule.properties.charge)
angstrom_to_bohr = Units.conversion_ratio("Angstrom", "Bohr")

if _has_scm_chemsys and isinstance(molecule, ChemicalSystem):
symbols = np.asarray([atom.symbol for atom in molecule.atoms])
coords = np.asarray(molecule.coords)
charge = molecule.charge
atomicInfo = [molecule.atom_EOL_string(i) for i in range(molecule.num_atoms)]
cell = np.asarray(molecule.lattice.vectors) if molecule.has_lattice() else None

if molecule.has_bonds():
bonds = np.asarray([[iat + 1, jat + 1] for iat, jat, _ in molecule.bonds], dtype=int)
bond_orders = np.asarray([bond.order for _, _, bond in molecule.bonds])
else:
bonds = None
bond_orders = None

else:
chemicalSystem["totalCharge"] = 0.0
atomicInfo = [AMSJob._atom_suffix(atom) for atom in molecule]
symbols = np.asarray([atom.symbol for atom in molecule])
coords = molecule.as_array()
charge = float(molecule.properties.charge) if "charge" in molecule.properties else 0.0

atomicInfo = [AMSJob._atom_suffix(atom) for atom in molecule]

cell = np.asarray(molecule.lattice) if molecule.lattice else None

if molecule.bonds:
bonds = np.array([[iat for iat in molecule.index(bond)] for bond in molecule.bonds])
bond_orders = np.asarray([float(bond.order) for bond in molecule.bonds])
else:
bonds = None
bond_orders = None

chemicalSystem["atomSymbols"] = symbols
chemicalSystem["coords"] = coords * angstrom_to_bohr
chemicalSystem["totalCharge"] = charge

if any(ai != "" for ai in atomicInfo):
chemicalSystem["atomicInfo"] = np.asarray(atomicInfo)
if molecule.lattice:
cell = np.asarray(molecule.lattice) * Units.conversion_ratio("Angstrom", "Bohr")
chemicalSystem["latticeVectors"] = cell
if molecule.bonds:
chemicalSystem["bonds"] = np.array([[iat for iat in molecule.index(bond)] for bond in molecule.bonds])
if len(chemicalSystem["bonds"]) == 0:
chemicalSystem["bonds"] = np.zeros((0, 2))
chemicalSystem["bondOrders"] = np.asarray([float(bond.order) for bond in molecule.bonds])

if cell is not None:
chemicalSystem["latticeVectors"] = cell * angstrom_to_bohr

if bonds is not None:
bonds = bonds if len(bonds) > 0 else np.zeros((0, 2))
chemicalSystem["bonds"] = bonds
chemicalSystem["bondOrders"] = bond_orders

self._call("SetSystem", chemicalSystem)

def SinglePoint(
self,
name: str,
molecule: "Molecule",
molecule: Union["Molecule", "ChemicalSystem"],
prev_results: Optional[AMSWorkerResults] = None,
quiet: bool = True,
gradients: bool = False,
Expand All @@ -1021,7 +1098,7 @@ def SinglePoint(
dipolemoment: bool = False,
dipolegradients: bool = False,
) -> AMSWorkerResults:
"""Performs a single point calculation on the geometry given by the |Molecule| instance *molecule* and returns an instance of |AMSWorkerResults| containing the results.
"""Performs a single point calculation on the geometry given by the system *molecule* (|Molecule| or ``ChemicalSystem``) and returns an instance of |AMSWorkerResults| containing the results.

Every calculation should be given a *name*. Note that the name **must be unique** for this |AMSWorker| instance: One should not attempt to reuse calculation names with a given instance of |AMSWorker|.

Expand Down Expand Up @@ -1062,7 +1139,7 @@ def SinglePoint(
def GeometryOptimization(
self,
name: str,
molecule: "Molecule",
molecule: Union["Molecule", "ChemicalSystem"],
prev_results: Optional[AMSWorkerResults] = None,
quiet: bool = True,
gradients: bool = True,
Expand All @@ -1086,7 +1163,7 @@ def GeometryOptimization(
convstressenergyperatom: Optional[float] = None,
constraints: Optional[Settings] = None,
) -> AMSWorkerResults:
"""Performs a geometry optimization on the |Molecule| instance *molecule* and returns an instance of |AMSWorkerResults| containing the results from the optimized geometry.
"""Performs a geometry optimization on the system *molecule* (|Molecule| or ``ChemicalSystem``) and returns an instance of |AMSWorkerResults| containing the results from the optimized geometry.

The geometry optimizer can be controlled using the following keyword arguments:

Expand Down Expand Up @@ -1156,7 +1233,7 @@ def MolecularDynamics(
self._start_subprocess()
raise

def CreateMDState(self, name: str, molecule: "Molecule") -> None:
def CreateMDState(self, name: str, molecule: Union["Molecule", "ChemicalSystem"]) -> None:
try:

self._prepare_system(molecule)
Expand Down Expand Up @@ -1398,7 +1475,10 @@ def __enter__(self: TPool) -> TPool:
return self

def _solve_from_settings(
self, items: Sequence[Tuple[str, "Molecule", Settings]], watch: bool = False, watch_interval: int = 60
self,
items: Sequence[Tuple[str, Union["Molecule", "ChemicalSystem"], Settings]],
watch: bool = False,
watch_interval: int = 60,
) -> List[Optional[AMSWorkerResults]]:
"""Request to pool to execute calculations for all items in the iterable *items*. Returns a list of |AMSWorkerResults| objects.

Expand Down Expand Up @@ -1491,10 +1571,17 @@ def _progress_monitor(pd: Dict[str, Any], t: int) -> None:
log(f"{str(num_done).rjust(width)} / {pd['num_jobs']} jobs finished:{percent_done:5.1f}%{trem}")

def _prep_solve_from_settings(
self, method: str, items: Sequence[Union[Tuple[str, "Molecule"], Tuple[str, "Molecule", Dict[str, Any]]]]
) -> List[Tuple[str, "Molecule", Settings]]:
self,
method: str,
items: Sequence[
Union[
Tuple[str, Union["Molecule", "ChemicalSystem"]],
Tuple[str, Union["Molecule", "ChemicalSystem"], Dict[str, Any]],
]
],
) -> List[Tuple[str, Union["Molecule", "ChemicalSystem"], Settings]]:

solve_items: List[Tuple[str, "Molecule", Settings]] = []
solve_items: List[Tuple[str, Union["Molecule", "ChemicalSystem"], Settings]] = []
for item in items:
if len(item) == 2:
name, mol = item
Expand All @@ -1514,7 +1601,12 @@ def _prep_solve_from_settings(

def SinglePoints(
self,
items: Sequence[Union[Tuple[str, "Molecule"], Tuple[str, "Molecule", Dict[str, Any]]]],
items: Sequence[
Union[
Tuple[str, Union["Molecule", "ChemicalSystem"]],
Tuple[str, Union["Molecule", "ChemicalSystem"], Dict[str, Any]],
]
],
watch: bool = False,
watch_interval: int = 60,
) -> List[Optional[AMSWorkerResults]]:
Expand All @@ -1524,21 +1616,30 @@ def SinglePoints(

If *watch* is set to ``True``, the AMSWorkerPool will regularly log progress information. The interval between messages can be set with the *watch_interval* argument in seconds.

As an example, the following call would do single point calculations with gradients and (only for periodic systems) stress tensors for all |Molecule| instances in the dictionary ``molecules``.
As an example, the following call would do single point calculations with gradients and (only for periodic systems) stress tensors for all systems in the dictionary ``molecules``.

.. code-block:: python

results = pool.SinglePoint([ (name, molecules[name], {
"gradients": True,
"stresstensor": len(molecules[name].lattice) != 0
"stresstensor": (
molecules[name].has_lattice()
if hasattr(molecules[name], "has_lattice")
else len(molecules[name].lattice) != 0
)
}) for name in sorted(molecules) ])
"""
solve_items = self._prep_solve_from_settings("SinglePoint", items)
return self._solve_from_settings(solve_items, watch, watch_interval)

def GeometryOptimizations(
self,
items: List[Union[Tuple[str, "Molecule"], Tuple[str, "Molecule", Dict[str, Any]]]],
items: List[
Union[
Tuple[str, Union["Molecule", "ChemicalSystem"]],
Tuple[str, Union["Molecule", "ChemicalSystem"], Dict[str, Any]],
]
],
watch: bool = False,
watch_interval: int = 60,
) -> List[Optional[AMSWorkerResults]]:
Expand Down
Loading
Loading