Source code for autojob.next.restart

"""Restart a completed task.

Examples:
    Programmatically,

    .. code-block:: python

        from pathlib import Path

        from autojob.next.restart import restart

        restart(src=Path.cwd())

    From the command-line,

    .. code-block:: console

        autojob restart
"""

from collections.abc import Iterable
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal

from autojob.harvest.harvest import harvest
from autojob.next import FILE_SIZE_LIMIT
from autojob.next import create_next_step
from autojob.next import finalize_task
from autojob.next import substitute_context
from autojob.parametrizations import VariableReference
from autojob.parametrizations import create_parametrization
from autojob.workflow import Step

logger = logging.getLogger(__name__)


if TYPE_CHECKING:
    from autojob.bases.task_base import TaskBase

_to_exclude_on_restart = ["task_id", "uri", "date_created", "last_updated"]


# TODO: add files_to_carryover to parametrization
# TODO: Remove auto_restart argument
[docs] def restart( src: str | Path | None = None, *, calc_mods: dict[str, Any] | None = None, sched_mods: dict[str, Any] | None = None, file_size_limit: float = FILE_SIZE_LIMIT, submit: bool = False, auto_restart: bool = False, # noqa: ARG001 files_to_carry_over: Iterable[str] | None = None, name_template: str | None = None, strictness: Literal["strict", "relaxed"] | None = None, ) -> tuple["TaskBase", Path]: """Advance to the next task in the workflow. When this function is executed for a *structured directory*, this function updates study and task group metadata files accordingly. Args: src: The directory of the completed task. Defaults to the current working directory. calc_mods: A dictionary mapping calculator parameters to values that should be used to overwrite the existing parameters. sched_mods: A dictionary mapping Slurm options to values that should be used to overwrite the existing parameters. file_size_limit: A float specifying the threshold above which files of this size will be deleted. Defaults to FILE_SIZE_LIMIT. submit: Whether or not to submit the new job after creation. Defaults to False. auto_restart: Whether or not to add logic to automatically restart the calculation after the calculation has converged. files_to_carry_over: A list of strings indicating which files to carry over from the old job directory to the new job directory. Defaults to None, in which case, the files to copy are determined from the previous task. name_template: A template to use for the directory name. Defaults to None in which case the task ID will be used. strictness: How to treat tasks for which errors are thrown during their harvesting. See :func:`~autojob.harvest.harvest.harvest` for details. Returns: A list of tuples (task_i, path_i) where task_i is the ith created task and path_i is the Path representing the directory containing the ith created task. Note: When setting the input parameters for the restart task, this function will assume that any top-level task attribute suffixed with `_inputs` is an input. For example, when restarting a :class:`.Calculation`, `attr`:`.Calculation.task_inputs`, `attr`:`.Calculation.calculation_inputs`, and `attr`:`.Calculation.scheduler_inputs` will be carried over. When restarting a :class:`.MolecularDynamics` task, `attr`:`.Calculation.md_inputs` will be carried over in addition to the three aforementioned inputs. Warning: When specifying `sched_mods`, be wary of setting mutually exclusive scheduler parameters (e.g, `mem` and `mem_per_cpu` or `cores` and `cores_per_node`). For example, if the `mem` parameter is set and one wants to set the `mem_per_cpu` parameter, set the `mem` key to `Unset` in `sched_mods` in addition to setting the `mem_per_cpu` key. Example: Run and restart calculation with EMT calculator >>> from ase.calculors.emt import EMT >>> from ase.build import bulk, molecule >>> from ase.optimize.optimize import LBFGS >>> from autojob.tasks.calculation import Calculation >>> cu_surface = bulk("Cu") * 3 >>> cu_surface.center(vacuum=(0, 0, 10.0)) >>> co2 = molecule("CO2") >>> co2.set_tags([1] * len(co2)) >>> complex = cu_surface.copy() >>> complex += co2 >>> opt = LBFGS(complex) >>> calc = Calculation(...) >>> calc.write_inputs(...) # After submitting the calculation locally or running on a cluster >>> restart(...) """ src = Path(src) if src else Path.cwd() logger.debug(f"Restarting task in {src}") calc_mods = calc_mods or {} sched_mods = sched_mods or {} completed_task = next( iter(harvest(src, strictness=strictness, use_cache=True)) ) finalize_task( src=src, task=completed_task, # TODO: must implement record keeping first, then expose as arg record_task=False, ) files_to_carry_over = ( files_to_carry_over or completed_task.task_inputs.files_to_carry_over ) # Template scheduler inputs with task metadata and structure name context = completed_task.task_metadata.model_dump(exclude_none=True) context["structure"] = ( Path(completed_task.task_inputs.atoms_filename).stem or "{structure}" ) sched_mods = substitute_context(sched_mods, context) parametrization = create_parametrization( completed_task, calc_mods=calc_mods, sched_mods=sched_mods, exclude_metadata=_to_exclude_on_restart, ) # Set new input atoms to completed task's output atoms parametrization.append( VariableReference( set_path=["task_inputs", "atoms"], constant=completed_task.task_outputs.atoms, ) ) step = Step( workflow_step_id=completed_task.task_metadata.workflow_step_id, task_class=completed_task.task_metadata.task_class, progression="independent", parametrizations=[parametrization], ) new_task, new_task_dir = next( iter( create_next_step( src=src, step=step, previous_task=completed_task, files_to_carry_over=files_to_carry_over, file_size_limit=file_size_limit, submit=submit, restart=True, name_template=name_template, ) ) ) logger.debug(f"Successfully created restart task from {src}") return new_task, new_task_dir