"""Store the results of a calculation.
This module defines the :class:`~autojob.tasks.calculation.Calculation`,
:class:`~autojob.tasks.calculation.CalculationInputs`, and
:class:`~autojob.tasks.calculation.CalculationOutputs` classes. Instances
of these classes represent the results of a calculation, its inputs, and its
outputs, respectively.
For building the respective documents from a folder, the
:class:`~autojob.tasks.calculation.Calculation` and
:class:`~autojob.tasks.calculation.CalculationOutputs` classes are
:class:`PathLoadable`.
Example:
.. code-block:: python
from autojob.tasks.calculation import Calculation
src = "path/to/calculation/directory"
results = Calculation.from_directory(src)
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Self
import numpy as np
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import FieldSerializationInfo
from pydantic import SerializerFunctionWrapHandler
from pydantic import field_serializer
from autojob import SETTINGS
from autojob.bases.task_base import TaskOutcome
from autojob.hpc import JobState
from autojob.hpc import ScheduledMixin
from autojob.hpc import SchedulerInputs
from autojob.hpc import SchedulerOutputs
from autojob.plugins import get_harvester
from autojob.tasks.task import Task
from autojob.tasks.task import TaskOutputs
from autojob.utils.atoms import copy_atom_metadata
from autojob.utils.files import template_script
if TYPE_CHECKING:
from ase import Atoms
from ase.calculators.calculator import Calculator
logger = logging.getLogger(__name__)
# TODO: Remove or move to vasp code
FILES_TO_COPY = [
"CHGCAR",
"*py",
"*cif",
"POSCAR",
"coord",
"*xyz",
"*.traj",
"CONTCAR",
"*.pkl",
"*xml",
"WAVECAR",
"*.com",
"*.chk",
]
FILES_TO_DELETE = [
"*.d2e",
"*.int",
"*.rwf",
"*.skr",
"*.inp",
"EIGENVAL",
"IBZKPT",
"PCDAT",
"PROCAR",
"ELFCAR",
"LOCPOT",
"PROOUT",
"TMPCAR",
"vasp.dipcor",
]
ArgSpec = tuple[
# (input posargs, input kwargs)
list[Any], dict[str, Any]
]
def _harvest_calculator_results(
src: str | Path,
calculator: str,
strict_mode: bool,
) -> dict[str, Any]:
logger.info(
"Harvesting calculator results with %s calculator harvester from "
"directory: %s",
calculator,
src,
)
try:
harvester = get_harvester(calculator)
except ValueError:
logger.warning("Unable to load %s calculator harvester.", calculator)
if strict_mode or calculator == "default":
raise
return _harvest_calculator_results(src, "default", strict_mode)
try:
calc_results = harvester(src)
logger.info(
"Successfully loaded %s calculator results from directory: %s.",
calculator,
src,
)
except Exception:
logger.exception(
"Unable to load %s calculator results from directory: %s. ",
calculator,
src,
)
if strict_mode or calculator == "default":
raise
return _harvest_calculator_results(src, "default", strict_mode)
return calc_results
# TODO: Implement
# When complete, this function should return a list of dictionaries. At least,
# each dictionary should contain an image in the optimization trajectory and
# whether the optimization converged.
def _harvest_optimizer_results(
src: str | Path,
optimizer: str,
strict_mode: bool, # noqa: ARG001
) -> list[dict[str, Any]]:
logger.info(
"Harvesting %s optimizer results from directory: %s.",
optimizer,
src,
)
return []
def _harvest_analysis_results(
src: str | Path,
analyses: list[str],
strict_mode: bool,
) -> dict[str, Any]:
logger.debug("Harvesting analysis results from directory: %s", src)
analysis_results = {}
for analysis in analyses:
harvester = get_harvester(analysis)
try:
logger.debug(
"Harvesting %s analysis results from directory: %s",
analysis,
src,
)
analysis_results[analysis] = harvester(src)
logger.info(
"Successfully harvested %s analysis results from directory: %s",
analysis,
src,
)
except Exception:
if strict_mode:
raise
logger.exception(
"Unable to harvest %s analysis results from directory: %s.",
analysis,
src,
)
return analysis_results
[docs]
class CalculationOutputs(BaseModel):
"""The outputs of a calculation."""
energy: float | None = Field(
default=None,
description="Total energy in units of eV.",
)
forces: list[list[float]] | None = Field(
default=None,
description="The force on each atom in units of eV/Å.",
)
# ? Rename to completed and use calculator_results to store calculator-
# ? specific convergence results?
converged: bool = Field(
default=False,
description="Whether or not the calculaton has converged",
)
calculator_results: dict[str, Any] | None = Field(
default=None,
description="Calculator-specific results in excess of "
"model-level fields",
)
optimizer_results: list[dict[str, Any]] | None = Field(
default=None,
description="A list of dictionaries, each containing "
"optimizer results from a step in the optimization",
)
analysis_results: dict[str, dict[str, Any]] | None = Field(
default=None,
description="A dictionary mapping post-calculation analysis names to "
"their results",
)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow")
[docs]
@field_serializer("calculator_results", mode="wrap")
def serialize_calculator_results(
self,
v: dict[str, Any] | None,
_: SerializerFunctionWrapHandler,
info: FieldSerializationInfo,
) -> dict[str, Any] | None:
"""Serialize the calculator results."""
if v is None:
return None
if info.mode == "json":
listified = {}
for k, value in v.items():
listified[k] = (
value.tolist() if isinstance(value, np.ndarray) else value
)
return listified
return v
# TODO: Add optimizer harvester
[docs]
@classmethod
def from_directory(
cls,
*,
src: str | Path,
calculator: str | None = None,
optimizer: str | None = None,
analyses: list[str] | None = None,
strict_mode: bool | None = None,
) -> CalculationOutputs:
"""Retrieve calculation outputs from a calculation directory.
Args:
src: The directory of a calculation.
calculator: The name of the ASE calculator used to perform the
calculation. This will be used to determine which harvester
plugin will be used to retrieve the calculator-specific
results. Defaults to the harvester defined in
mod:`autojob.harvest.harvester.default`.
optimizer: The name of the ASE optimizer used to perform the
calculation. Defaults to None, in which case no optimizer
results are harvested. *This argument is not yet implemented.*
analyses: A list of post-calculation analyses whose results are to
be harvested. Defaults to an empty list.
strict_mode: Whether or not to require all outputs. If True,
errors will be thrown on missing outputs. Defaults to
``SETTINGS.STRICT_MODE``.
Returns:
A CalculationOutputs object.
"""
if strict_mode is None:
strict_mode = SETTINGS.STRICT_MODE
analyses = analyses or []
calculator = calculator or "default"
logger.debug("Loading calculation outputs from directory: %s", src)
logger.debug("Strict mode: %sabled", "en" if strict_mode else "dis")
calc_results = _harvest_calculator_results(
src, calculator, strict_mode
)
opt_results = _harvest_optimizer_results(src, optimizer, strict_mode)
analysis_results = _harvest_analysis_results(
src,
analyses,
strict_mode,
)
calculation_outputs = cls(
**calc_results,
analysis_results=analysis_results,
optimizer_results=opt_results,
)
logger.info(
"Successfully loaded calculation outputs from directory: %s",
src,
)
return calculation_outputs
[docs]
class Calculation(Task, ScheduledMixin):
"""A record representing a calculation."""
calculation_inputs: CalculationInputs = Field(
default_factory=CalculationInputs,
description="The inputs of the calculation",
)
calculation_outputs: CalculationOutputs | None = Field(
default=None, description="The calculation outputs"
)
# TODO: Write unit test for copying metadata
[docs]
@staticmethod
def patch_task(
*,
task_outputs: TaskOutputs | None,
input_atoms: Atoms | None,
output_atoms: Atoms | None,
state: JobState,
converged: bool,
) -> None:
"""Patch Task attributes using Calculation values.
Note that this method modifies the Task in place. The following
attributes are patched:
- ``Task.task_outputs.atoms``: replaced with ``output_atoms`` with
metadata inherited from ``input_atoms``
- ``Task.task_inputs.files_to_carryover``: replaced with
``files_to_carry_over``
- ``Task.task_outputs.outcome``: set according to ``converged`` and
``state``
Args:
task_outputs: The :class:`~base_task.TaskOutputs` to be patched.
input_atoms: An Atoms object representing the input geometry.
output_atoms: An Atoms object representing the output geometry.
state: The state of the scheduler job.
converged: Whether or not the Calculation converged.
"""
if task_outputs is None:
logger.info("No task outputs to patch in task")
return None
if task_outputs.atoms is None and output_atoms:
logger.debug("Patching output atoms")
copy_atom_metadata(input_atoms, output_atoms)
task_outputs.atoms = output_atoms
if state in (JobState.COMPLETED, JobState.UNKNOWN) and converged:
task_outputs.outcome = TaskOutcome.SUCCESS
else:
task_outputs.outcome = TaskOutcome.FAILED
logger.debug(f"Task outcome: {task_outputs.outcome}")
[docs]
@classmethod
def from_directory(cls, src: str | Path, **kwargs) -> Self:
"""Generate a ``Calculation`` document from a task directory.
Args:
src: The directory of a calculation.
kwargs: Additional keyword arguments:
- strict_mode: Whether or not to fail on any error. Defaults to
`SETTINGS.STRICT_MODE`.
- magic_mode: Whether or not to instantiate subclasses. If
True, the task returned must be an instance determined by
metadata in the directory. Defaults to False.
Returns:
A :class:`Calculation` or a subclass of a :class:`Calculation`.
.. seealso::
:meth:`.task.Task.from_directory`
"""
strict_mode = kwargs.get("strict_mode", SETTINGS.STRICT_MODE)
magic_mode = kwargs.get("magic_mode", False)
logger.debug("Loading calculation from directory: %s", src)
logger.debug("Magic mode: %sabled", "en" if magic_mode else "dis")
logger.debug("Strict mode: %sabled", "en" if strict_mode else "dis")
if magic_mode:
return cls.load_magic(src, strict_mode=strict_mode)
# Task must be explicitly used (instead of super()) because
# Task.from_directory uses returns a type determined by cls
task = Task.from_directory(
src=src, strict_mode=strict_mode, magic_mode=False
)
data = task.task_inputs.model_extra.pop("calculation_inputs", {})
calc_inputs = CalculationInputs(**data)
calc_outputs = CalculationOutputs.from_directory(
src=src,
calculator=calc_inputs.calculator,
optimizer=calc_inputs.optimizer,
analyses=list(calc_inputs.analyses),
strict_mode=strict_mode,
)
sched_inputs = SchedulerInputs.from_directory(src=src)
sched_outputs = SchedulerOutputs.from_directory(src=src)
if calc_outputs.calculator_results:
output_atoms = calc_outputs.calculator_results.pop("atoms", None)
else:
output_atoms = None
if isinstance(task.task_inputs.atoms, list):
input_atoms = (
task.task_inputs.atoms[0] if task.task_inputs.atoms else None
)
else:
input_atoms = task.task_inputs.atoms
cls.patch_task(
task_outputs=task.task_outputs,
input_atoms=input_atoms,
output_atoms=output_atoms,
state=sched_outputs.state,
converged=calc_outputs.converged,
)
logger.debug("Successfully loaded calculation from directory: %s", src)
return cls(
**task.model_dump(),
calculation_inputs=calc_inputs,
calculation_outputs=calc_outputs,
scheduler_inputs=sched_inputs,
scheduler_outputs=sched_outputs,
)
[docs]
def write_calculation_script(
self,
dest: str | Path,
*,
additional_data: dict[str, Any] | None = None,
) -> Path:
"""Write the calculation script used to run the task.
Args:
dest: The directory in which to write the Python script.
additional_data: A dictionary mapping strings to JSON-serializable
values to be merged with the calculation inputs that will be
written to the calculation script. Defaults to an empty
dictionary.
Returns:
A Path object representing the filename of the written calculation
script.
"""
logger.debug("Writing calculation script to directory: %s", dest)
additional_data = additional_data or {}
context = {**self.model_dump(), "settings": SETTINGS.model_dump()}
context |= additional_data or {}
filename = Path(dest, self.calculation_inputs.calculation_script)
template_script(
dest=filename,
script_template=self.task_inputs.task_script_template,
context=context,
)
logger.debug(
"Successfully wrote calculation script to file: %s", filename
)
return filename
[docs]
def write_task_script(
self,
dest: str | Path,
*,
additional_data: dict[str, Any] | None = None,
) -> Path:
"""Write the SLURM input script using the template given.
Args:
dest: The directory in which to write the SLURM file.
additional_data: A dictionary mapping strings to JSON-serializable
values to be merged with the task inputs that will be written
to the inputs JSON. Defaults to an empty dictionary.
Returns:
A Path representing the filename of the written SLURM script.
"""
raw_sched_inputs = self.scheduler_inputs.model_dump(
mode="json", exclude_none=True, by_alias=True
)
inputs = {
"scheduler_inputs": {
f"--{k}": v for k, v in raw_sched_inputs.items()
}
}
inputs |= additional_data or {}
return super().write_task_script(dest, additional_data=inputs)