Source code for autojob.bases.task_base
"""Represent and model the results of a task."""
from abc import ABC
from abc import abstractmethod
from datetime import UTC
from datetime import datetime
from enum import StrEnum
from enum import unique
import logging
from pathlib import Path
from typing import ClassVar
from typing import Protocol
from typing import Self
from typing import runtime_checkable
from pydantic import UUID4
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import model_validator
from pymatgen.entries.computed_entries import ComputedEntry
from autojob import SETTINGS
from autojob.utils.schemas import PydanticAtoms
from autojob.utils.schemas import id_factory
logger = logging.getLogger(__name__)
TASK_GROUP_FIELDS = [
"label",
"study_group_id",
"study_id",
"task_group_id",
"date_created",
]
[docs]
@runtime_checkable
class PathLoadable(Protocol):
"""A protocol for objects that can be loaded from a directory."""
[docs]
@classmethod
def from_directory( # type: ignore[no-untyped-def]
cls,
src: str | Path,
**kwargs,
) -> Self:
"""Load an instance of the class from a directory.
Args:
src: The directory from which to load the object.
kwargs: Additional keyword arguments.
Returns:
An instance of the class or a subclass.
"""
[docs]
@runtime_checkable
class InputWriter(Protocol):
"""A protocol for objects that can write their inputs to a directory."""
[docs]
def write_inputs( # type: ignore[no-untyped-def]
self,
dest: str | Path,
**kwargs,
) -> list[Path]:
"""Write inputs of the class to the directory for execution.
This method should also perform any modifications necesary to prepare
inputs. For example, such a modification may be copying magnetic
moments to initial magnetic moments.
Args:
dest: The directory to which the inputs will be written.
kwargs: Additional keyword arguments.
Returns:
A list of input files written.
"""
[docs]
class SetTaskClassMixin:
"""A mixin that sets `task_class` for the TaskMetadataBase of a task."""
[docs]
@model_validator(mode="after")
def set_task_class(self) -> Self:
"""Sets the task_class for the TaskMetadataBase of a task."""
if self.task_metadata.task_class is None: # type: ignore[attr-defined]
self.task_metadata.task_class = self.__class__.__name__.lower() # type: ignore[attr-defined]
return self
[docs]
@unique
class TaskOutcome(StrEnum):
"""The outcome of a task.
The meanings of each outcome are as follows:
SUCCESS: The task completed successfully.
FAILED: The task completed with errors.
IDLE: The task is yet to run.
RUNNING: The task is running.
UNKNOWN: The outcome of the task is unknown.
"""
SUCCESS = "success"
FAILED = "failed"
IDLE = "idle"
RUNNING = "running"
UNKNOWN = "unknown"
[docs]
class TaskMetadataBase(BaseModel, ABC):
"""The metadata for a task.
Note that :class:`TaskMetadataBase` instances are ``PathLoadable``.
"""
model_config = ConfigDict(populate_by_name=True, extra="allow")
label: str = Field(default="", description="A description of the job")
tags: list[str] = Field(
default=[],
title="tag",
description="Metadata tagged to a given job",
)
uri: str | None = Field(
default=None,
description="The uri for the directory containing this task",
)
study_group_id: UUID4 | str | None = Field(
default_factory=id_factory("g"),
description="The study group uuid",
union_mode="left_to_right",
)
study_id: UUID4 | str | None = Field(
default_factory=id_factory("s"),
description="The study uuid",
union_mode="left_to_right",
)
workflow_step_id: UUID4 | None = Field(
default=None, description="The workflow step uuid"
)
task_id: UUID4 | str = Field(
default_factory=id_factory("j"),
description="A UUID identifying the task.",
union_mode="left_to_right",
)
task_group_id: UUID4 | str | None = Field(
default_factory=id_factory("c"),
description="A UUID identifying a group of tasks.",
union_mode="left_to_right",
)
date_created: datetime = Field(
default_factory=lambda: datetime.now(tz=UTC),
description="The date and time that the task was first created.",
)
last_updated: datetime = Field(
default_factory=lambda: datetime.now(tz=UTC),
description="Timestamp for the most recent calculation for this task "
"document",
)
# This attribute is necessary in order to enable loading tasks from
# a directory regardless of its successful completion. There should
# always be a metadata file present, but there may not be a task file
# present
task_class: str | None = Field(
default=None,
description="The plugin name of the concrete TaskBase "
"implementation to which this metadata belongs (e.g., "
"task)",
)
[docs]
@classmethod
@abstractmethod
def from_directory(cls, src: str | Path) -> Self:
"""Load a TaskMetadataBase from a directory."""
class _TaskIODoc(BaseModel, ABC):
"""A base class for task input/output documents.
Note that by definition, concrete implementations of the class adhere to
the :class:`PathLoadable` protocol.
"""
atoms: PydanticAtoms | None = Field(
default=None, description="Input or output ase.Atoms"
)
@classmethod
@abstractmethod
def from_directory( # type: ignore[no-untyped-def]
cls,
src: str | Path,
**kwargs,
) -> Self:
"""Load a task from a directory.
Args:
src: The directory from which to load the task.
kwargs: Additional keyword arguments:
- strict_mode: Whether or not to fail on any error. Defaults to
`SETTINGS.STRICT_MODE`.
- magic_mode: Whether or not to instantiate subclasses. If True,
the task returned must be an instance determined by
metadata in the directory. Defaults to False.
- cache_file: The filename of a cache file from which to load.
Defaults to None in which case the task is not read from
the cache file. This argument is useful for when data
should be re-loaded from a directory.
Returns:
A task or a subclass of task.
"""
[docs]
class TaskInputsBase(_TaskIODoc):
"""The set of task-level inputs.
Note that :class:`TaskInputsBase` instances are ``PathLoadable``.
"""
files_to_copy: list[str] = Field(
default=[],
description="The files to copy from the preceding task into the "
"scratch directory of this task.",
)
files_to_delete: list[str] = Field(
default=[],
description="The files to delete from the directory of the task after "
"job completion.",
)
files_to_carry_over: list[str] = Field(
default=[],
description="The files to carry over from the completed task to the "
"new job.",
)
auto_restart: bool = Field(
default=True,
description="Whether or not to automatically restart this calculation "
"with the same parameters if the task finishes unsuccessfully.",
)
atoms_filename: str = Field(
default=SETTINGS.INPUT_ATOMS_FILE,
description="The filename of the input atoms",
)
task_script: str = Field(
default=SETTINGS.DEFAULT_TASK_SCRIPT_FILE,
description="The default filename for the task script",
)
task_script_template: str = Field(
default=SETTINGS.TASK_SCRIPT_TEMPLATE,
description="The name of the default task script template",
)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow")
[docs]
class TaskOutputsBase(_TaskIODoc):
"""The set of task-level outputs.
Note that :class:`TaskOutputsBase` instances are ``PathLoadable``.
"""
entry: ComputedEntry | None = Field(
default=None, description="The ComputedEntry from the task"
)
outcome: TaskOutcome = Field(
default=TaskOutcome.UNKNOWN, description="The outcome of the task"
)
[docs]
class TaskBase(BaseModel, ABC):
"""An abstract base class for tasks.
Note that by definition, concrete implementations of the class adhere to
the :class:`PathLoadable` and :class:`InputWriter` protocols.
Concrete implementations of this class must be defined such that they
can be instantiated without arguments.
"""
task_metadata: TaskMetadataBase = Field(description="Task metadata")
task_inputs: TaskInputsBase = Field(description="Task inputs")
task_outputs: TaskOutputsBase | None = Field(description="Task outputs")
model_config = ConfigDict(extra="allow")
[docs]
@classmethod
@abstractmethod
def from_directory( # type: ignore[no-untyped-def]
cls, src: str | Path, **kwargs
) -> Self:
"""Load a task from a directory.
Args:
src: The directory from which to load the task.
kwargs: Additional keyword arguments:
- strict_mode: Whether or not to fail on any error. Defaults to
`SETTINGS.STRICT_MODE`.
- magic_mode: Whether or not to instantiate subclasses. If
True, the task returned must be an instance determined by
metadata in the directory. Defaults to False.
Returns:
A task whose type is that of the calling class or one of its
subclasses.
"""
[docs]
@abstractmethod
def write_inputs(self, dest: str | Path, **kwargs) -> list[Path]: # type: ignore[no-untyped-def]
"""Write a task to the directory for execution.
Concrete implementations must at least write an inputs JSON, a
metadata file, and the input Atoms (if non-None).
Args:
dest: The directory in which to write inputs.
kwargs: Additional keyword arguments.
"""