"""Store the results of a calculation.
This module defines the :class:`.autojob.calculation.calculation.Calculation`,
:class:`.autojob.calculation.calculation.CalculationInputs`, and
:class:`.autojob.calculation.calculation.CalculationOutputs` classes. Instances
of these classes represent the results of a calculation, its inputs, and its
outputs, respectively.
For building the respective documents from a folder, each class exposes a
``from_directory()`` method.
Example:
.. code-block:: python
from autojob.calculation.calculation import Calculation
dir_name = "path/to/calculation/directory"
results = Calculation.from_directory(dir_name)
"""
from __future__ import annotations
import ast
from collections.abc import Sequence
from contextlib import suppress
import importlib
import logging
import pathlib
from typing import Any
from typing import ClassVar
from typing import Literal
from typing import Self
from typing import TextIO
from typing import overload
import warnings
from xml.etree import ElementTree
from ase import Atoms
from ase.calculators.calculator import PropertyNotImplementedError
import ase.io
import jinja2
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import ImportString
from pydantic import TypeAdapter
from pydantic import field_validator
from pymatgen.command_line.bader_caller import bader_analysis_from_path
from pymatgen.io.vasp import Kpoints
from autojob import SETTINGS
from autojob import hpc
from autojob.calculation import gaussian
from autojob.calculation.charge import BaderAnalysis
from autojob.calculation.charge import DDEC6Analysis
from autojob.calculation.charge import load_ddec6_data
from autojob.calculation.parameters import CalculatorType
from autojob.calculation.vasp import vasp
from autojob.task import Task
from autojob.utils.files import extract_structure_name
from autojob.utils.files import get_loader
from autojob.utils.parsing import extract_keyword_arguments
logger = logging.getLogger(__name__)
FILES_TO_COPY = [
"CHGCAR",
"*py",
"*cif",
"POSCAR",
"coord",
"*xyz",
"*.traj",
"CONTCAR",
"*.pkl",
"*xml",
"WAVECAR",
"*.com",
"*.chk",
]
FILES_TO_DELETE = [
"*.d2e",
"*.int",
"*.rwf",
"*.skr",
"*.inp",
"EIGENVAL",
"IBZKPT",
"PCDAT",
"PROCAR",
"ELFCAR",
"LOCPOT",
"PROOUT",
"TMPCAR",
"vasp.dipcor",
]
[docs]
class Pseudopotential(BaseModel):
"""A pseudopotential."""
pot_type: str | None = Field(
default=None,
description="Pseudo-potential type, e.g. PAW",
)
functional: str | None = Field(
default=None,
description="Functional type use in the calculation.",
)
symbols: list[str] | None = Field(
default=None,
description="List of VASP potcar symbols used in the calculation.",
)
[docs]
class Analysis(BaseModel):
"""Analysis from a calculation."""
delta_volume: float | None = Field(
default=None,
title="Volume Change",
description="Volume change for the calculation.",
)
delta_volume_percent: float | None = Field(
default=None,
title="Volume Change Percent",
description="Percent volume change for the calculation.",
)
max_force: float | None = Field(
default=None,
title="Max Force",
description="Maximum force on any atom at the end of the calculation.",
)
warnings: list[str] | None = Field(
default=None,
title="Calculation Warnings",
description="Warnings issued after analysis.",
)
errors: list[str] | None = Field(
default=None,
title="Calculation Errors",
description="Errors issued after analysis.",
)
[docs]
def check_bader(parameters: dict[str, Any]) -> list[str]:
"""Peform parameter checks for Bader analysis."""
msgs = []
for kw in ["laechg", "lcharg"]:
if not parameters.get(kw, False):
msgs.append(
f"{kw.upper()} is not to True but `run_bader` is True. "
f"Bader analysis requires {kw.upper()} to be True"
)
if parameters.get("nsw", 0):
msgs.append(
"NSW is non-zero but `run_bader` is True. "
"Bader analysis requires NSW to be zero."
)
return msgs
[docs]
class CalculationOutputs(BaseModel):
"""The outputs of a calculation."""
density: float | None = Field(
default=None, description="Density of in units of g/cc."
)
energy: float | None = Field(
default=None,
description="Total Energy in units of eV.",
)
forces: list[list[float]] | None = Field(
default=None,
description="The force on each atom in units of eV/Å.",
)
stress: list[list[float]] | None = Field(
default=None,
description="The stress on the cell in units of kB.",
)
energy_per_atom: float | None = Field(
default=None,
description="The final DFT energy per atom for the last calculation",
)
bandgap: float | None = Field(
default=None,
description="The DFT bandgap for the last calculation",
)
converged: bool = Field(
default=False,
description="Whether or not the calculaton has converged",
)
analysis: Analysis | None = Field(
default=None,
title="Calculation Analysis",
description="Some analysis of calculation data after collection.",
)
calculation_objects: dict[str, Any] | None = Field(
default=None,
description="Calculation objects returned as outputs of this "
"calculation",
)
bader_analysis: BaderAnalysis | None = Field(
default=None, description="A Bader charge analysis document"
)
ddec6_analysis: DDEC6Analysis | None = Field(
default=None, description="A DDEC6 charge analysis document"
)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow")
# ? How to record Bader charges?
# - attach in Python script: add to Atoms object as charges after running
# - ensure Atoms are serialized with `charges`
[docs]
@classmethod
def from_directory(
cls,
*,
dir_name: str | pathlib.Path,
calculator_type: CalculatorType | None = None,
strict_mode: bool = SETTINGS.STRICT_MODE,
) -> CalculationOutputs:
"""Generate a CalculationOutputs document from a calculation directory.
Args:
dir_name: The directory of a calculation.
calculator_type: The type of calculation run. Must correspond to
an ASE calculator.
strict_mode: Whether or not to require all outputs. If True,
errors will be thrown on missing outputs.
Returns:
A CalculationOutputs object.
"""
logger.debug(f"Loading calculation outputs from {dir_name}")
logger.debug(f"Strict mode {'en' if strict_mode else 'dis'}abled")
# TODO: eliminate try/except by passing down strict_mode
try:
match calculator_type:
case CalculatorType.VASP:
outputs = vasp.load_calculation_outputs(dir_name=dir_name)
case CalculatorType.GAUSSIAN:
outputs = gaussian.load_calculation_outputs(
dir_name=dir_name
)
case None:
outputs = {
"energy": read_energy(dir_name),
"forces": read_forces(dir_name),
}
case _:
msg = (
f"Loading {calculator_type!s} calculation outputs "
"not supported!"
)
raise NotImplementedError(msg)
except (
FileNotFoundError,
NotImplementedError,
ElementTree.ParseError,
):
logger.warning(
f"Unable to read {calculator_type!s} calculation outputs"
)
if strict_mode:
raise
logger.info("Attempting to read generic calculation outputs")
outputs = {
"energy": read_energy(dir_name, strict_mode=strict_mode),
"forces": read_forces(dir_name, strict_mode=strict_mode),
}
outputs["bader_analysis"] = outputs["chargemol_analysis"] = None
with suppress(Exception):
outputs["bader_analysis"] = bader_analysis_from_path(dir_name)
with suppress(Exception):
outputs["chargemol_analysis"] = load_ddec6_data(dir_name)
calculation_outputs = cls(**outputs)
logger.debug(
f"Successfully loaded calculation outputs from {dir_name}"
)
return calculation_outputs
[docs]
class Calculation(Task):
"""A record representing a calculation."""
calculation_inputs: CalculationInputs
calculation_outputs: CalculationOutputs | None = None
scheduler_inputs: hpc.SchedulerInputs
scheduler_outputs: hpc.SchedulerOutputs | None = None
@overload
@staticmethod
def get_output_atoms(
dir_name: str | pathlib.Path,
input_atoms: Atoms,
calculator_type: CalculatorType,
*,
strict_mode: Literal[True],
) -> Atoms: ...
@overload
@staticmethod
def get_output_atoms(
dir_name: str | pathlib.Path,
input_atoms: Atoms,
calculator_type: CalculatorType,
*,
strict_mode: Literal[False],
) -> Atoms | None: ...
[docs]
@staticmethod
def get_output_atoms(
dir_name,
input_atoms,
calculator_type,
*,
strict_mode=SETTINGS.STRICT_MODE,
):
"""Retrieve output Atoms from a Calculation.
Args:
dir_name: The directory of a calculation.
calculator_type: The type of calculation run. Must correspond to
an ASE calculator.
input_atoms: An Atoms object representing the corresponding input
structure.
strict_mode: Whether to raise an error if no output atoms found.
Defaults to ``SETTINGS.STRICT_MODE``.
Raises:
FileNotFoundError: Unable to find output atoms file.
Returns:
An Atoms object representing the output structure.
"""
logger.debug(f"Strict mode {'en' if strict_mode else 'dis'}abled")
try:
match calculator_type:
case CalculatorType.VASP:
atoms = vasp.get_output_atoms(
dir_name=dir_name, input_atoms=input_atoms
)
case CalculatorType.GAUSSIAN:
atoms = gaussian.get_output_atoms(
dir_name=dir_name, input_atoms=input_atoms
)
case _:
msg = (
f"Retrieving output atoms from {calculator_type!s} "
"calculations is not yet supported"
)
raise NotImplementedError(msg)
except (FileNotFoundError, NotImplementedError, StopIteration):
if strict_mode:
raise
logger.warning("Unable to retrieve output atoms")
return None
with dir_name.joinpath(SETTINGS.PYTHON_SCRIPT).open(
mode="r", encoding="utf-8"
) as file:
filename = extract_structure_name(file)
atoms.info["filename"] = filename
return atoms
[docs]
@staticmethod
def get_files_to_carryover(calculator_type: CalculatorType) -> list[str]:
"""Returns a list of strings representing the files to be carried over.
Args:
calculator_type: The type of calculator used in the calculation.
"""
mod = importlib.import_module(f"{__package__}.{calculator_type!s}")
files_to_carryover: list[str] = TypeAdapter(list[str]).validate_python(
mod.FILES_TO_CARRYOVER
)
logger.debug(
"Successfully retrieved files to carry over: "
f"{files_to_carryover!r}"
)
return files_to_carryover
[docs]
@staticmethod
def create_shell(context: dict[str, Any] | None = None) -> Calculation:
"""Creates a minimal :class:`Calculation` with defaults set.
Args:
context: A dictionary to be used to seed values in the shell.
Defaults to None.
Returns:
A new :class:`Calculation` with no outputs.
"""
context = context or {}
return Calculation(
**Task.create_shell(context).model_dump(exclude_none=True),
calculation_inputs=context.get(
"calculation_inputs", CalculationInputs()
),
scheduler_inputs=context.get(
"scheduler_inputs", hpc.SchedulerInputs()
),
)
[docs]
@classmethod
def from_directory(
cls,
dir_name: str | pathlib.Path,
*,
strict_mode: bool = SETTINGS.STRICT_MODE,
magic_mode: bool = False,
calculator_type: CalculatorType | None = None,
task: Task | None = None,
) -> Self:
"""Generate a ``Calculation`` document from a calculation directory.
Args:
dir_name: The directory of a calculation.
strict_mode: Whether to raise an error if no output atoms found.
Defaults to ``SETTINGS.STRICT_MODE``.
magic_mode: Whether to defer the final object creation. If True,
the final object will be an instance of the class specified
by the ``_build_class`` attribute of the :class:`TaskMetadata`
object created. Otherwise, a :class:`Calculation` object will
be returned. Defaults to False.
calculator_type: The type of calculation run. Must correspond to
an ASE calculator.
task: A :class:`.task.Task` from which to build the Calculation.
.. deprecated:: 0.12.0
This parameter is ignored since task metadata, inputs, and
outputs are now **always** loaded using
``super().from_directory()``.
Returns:
A :class:`Calculation` object.
.. seealso::
:meth:`.task.Task.from_directory`
"""
logger.debug("Loading calculation from %s", dir_name)
logger.debug(f"Strict mode {'en' if strict_mode else 'dis'}abled")
if task:
msg = "This parameter is now ignored. See docs."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
if magic_mode:
return cls.load_magic(dir_name, strict_mode=strict_mode)
task = Task.from_directory(dir_name=dir_name, strict_mode=strict_mode)
calculation_inputs = CalculationInputs.from_directory(
dir_name=dir_name,
calculator_type=calculator_type,
)
calculator_type = CalculatorType(
calculation_inputs.ase_calculator.__name__.lower()
)
# TODO: Consider reading subset of outputs from Atoms object
# TODO: (e.g.,: forces, energy) and deferring calculation-specific
# TODO: properties to other schemes
calculation_outputs = CalculationOutputs.from_directory(
dir_name=dir_name,
calculator_type=calculator_type,
strict_mode=strict_mode,
)
scheduler_inputs = hpc.SchedulerInputs.from_directory(
dir_name=dir_name
)
scheduler_outputs = hpc.SchedulerOutputs.from_directory(
dir_name=dir_name
)
if task.task_outputs is None or task.task_outputs.atoms is None:
output_atoms = Calculation.get_output_atoms(
dir_name=dir_name,
input_atoms=task.task_inputs.atoms,
calculator_type=calculator_type,
strict_mode=strict_mode,
)
else:
output_atoms = task.task_outputs.atoms
with dir_name.joinpath(SETTINGS.PYTHON_SCRIPT).open(
mode="r", encoding="utf-8"
) as file:
structure_filename = extract_structure_name(file)
output_atoms.info["filename"] = structure_filename
task.patch_task(
converged=calculation_outputs.converged,
error=scheduler_outputs.error,
output_atoms=output_atoms,
files_to_carry_over=Calculation.get_files_to_carryover(
calculator_type
),
)
logger.debug("Successfully loaded calculation from %s", dir_name)
return cls(
**task.model_dump(exclude_none=True),
calculation_inputs=calculation_inputs,
calculation_outputs=calculation_outputs,
scheduler_inputs=scheduler_inputs,
scheduler_outputs=scheduler_outputs,
)
[docs]
def write_python_script(
self,
dst: pathlib.Path,
*,
template: str = SETTINGS.PYTHON_TEMPLATE,
structure_name: str | None = None,
) -> pathlib.Path:
"""Write the Python script used to run the Calculation.
Args:
dst: The directory in which to write the Python script.
template: The name of the template to use to write the Python
script.
structure_name: The filename of the input structure to be read to
load the :class:`~ase.atoms.Atoms` object for the calculation.
Defaults to the value of the ``"filename"`` key in the input
Atoms object, if present. Defaults to the value of
``SETTINGS.INPUT_ATOMS`` otherwise. Take care to ensure that
this matches the name of the file to which the structure is
written.
Returns:
A Path object representing the filename of the written Python
script.
"""
msgs = self.calculation_inputs.check_inputs()
for msg in msgs:
logger.warning(msg)
calculator = self.calculation_inputs.ase_calculator.__name__
parameters = self.calculation_inputs.parameters
for k, v in parameters.items():
parameters[k] = repr(v)
env = jinja2.Environment(
loader=get_loader(),
autoescape=False, # noqa: S701
trim_blocks=True,
lstrip_blocks=True,
keep_trailing_newline=True,
)
to_render = env.get_template(template)
filename = dst.joinpath(SETTINGS.PYTHON_SCRIPT)
if structure_name is None:
if (
self.task_inputs.atoms is None
or self.task_inputs.atoms.info.get("filename", None) is None
):
structure_name = SETTINGS.INPUT_ATOMS
else:
structure_name = self.task_inputs.atoms.info["filename"]
with filename.open(mode="x", encoding="utf-8") as file:
file.write(
to_render.render(
calculator=calculator,
structure=structure_name,
parameters=parameters,
run_bader=self.calculation_inputs.run_bader,
run_chargemol=self.calculation_inputs.run_chargemol,
)
)
return filename
[docs]
def write_script(
self,
dst: pathlib.Path,
*,
run_script_template: str = SETTINGS.SLURM_TEMPLATE,
**kwargs,
) -> pathlib.Path:
"""Write the SLURM input script using the template given.
Args:
dst: The directory in which to write the SLURM file.
run_script_template: The template file to use. Defaults to
``SETTINGS.SLURM_TEMPLATE``.
**kwargs: additional keyword arguments to be used to
render the script template.
Returns:
A Path representing the filename of the written SLURM script.
"""
msgs = self.scheduler_inputs.check_inputs()
for msg in msgs:
logger.warning(msg)
parameters = self.scheduler_inputs.model_dump(
mode="json", exclude_none=True, by_alias=True
)
slurm_parameters = {}
for key, value in parameters.items():
# ? Is this necessary given that we're dumping by alias above
new_key = key.replace("_", "-")
new_key = f"-{new_key}" if len(new_key) == 1 else f"--{new_key}"
slurm_parameters[new_key] = value
calculator = self.calculation_inputs.ase_calculator.__name__
kwargs = dict(
parameters=slurm_parameters,
**self.task_inputs.model_dump(),
calculator=calculator,
python_script=SETTINGS.PYTHON_SCRIPT,
slurm_script=SETTINGS.SLURM_SCRIPT,
)
# TODO: add CLI logic to using `construct_cli_call`
return super().write_script(
dst, run_script_template=run_script_template, **kwargs
)
@overload
def read_energy(
dir_name: pathlib.Path, *, strict_mode: Literal[False]
) -> float | None: ...
@overload
def read_energy(
dir_name: pathlib.Path, *, strict_mode: Literal[True]
) -> float: ...
[docs]
def read_energy(dir_name, *, strict_mode=SETTINGS.STRICT_MODE):
"""Read the final energy from the output atoms.
Args:
dir_name: The path to the calculation directory.
strict_mode: Whether or not to raise an error if unable to retrieve
the energy. If True, one of FileNotFound, PropertyNotImplemented,
or RuntimeError will be raised if unable to retrieve the energy.
Returns:
A float representing the energy. If unable to read the energy
and strict_mode is disable, this function will return None.
"""
logger.info(f"Reading generic energy from {dir_name}")
try:
atoms = ase.io.read(dir_name.joinpath("final.traj"), index=-1)
e = atoms.get_potential_energy()
logger.info(f"Successfully read generic energy from {dir_name}")
return e
except (FileNotFoundError, PropertyNotImplementedError, RuntimeError):
logger.warning(f"Failed to read generic energy from {dir_name}")
if strict_mode:
raise
return None
@overload
def read_forces(
dir_name: pathlib.Path, *, strict_mode: Literal[False]
) -> list[list[float]] | None: ...
@overload
def read_forces(
dir_name: pathlib.Path, *, strict_mode: Literal[True]
) -> list[list[float]]: ...
[docs]
def read_forces(
dir_name: pathlib.Path, *, strict_mode: bool = SETTINGS.STRICT_MODE
) -> list[list[float]]:
"""Read the final forces from the output atoms.
Args:
dir_name: The path to the calculation directory.
strict_mode: Whether or not to raise an error if unable to retrieve
the forces. If True, one of FileNotFound, PropertyNotImplemented,
or RuntimeError will be raised if unable to retrieve the forces.
Defaults to :attr:`SETTINGS.STRICT_MODE`.
Returns:
A list of lists of floats representing the forces. If unable to read
the energy and ``strict_mode`` is disable, this function will return
None.
"""
logger.info(f"Reading generic forces from {dir_name}")
try:
atoms = ase.io.read(dir_name.joinpath("final.traj"), index=-1)
f = atoms.get_forces()
logger.info(f"Successfully read generic forces from {dir_name}")
return f
except (FileNotFoundError, PropertyNotImplementedError, RuntimeError):
logger.warning(f"Failed to read generic forces from {dir_name}")
if strict_mode:
raise
return None