Source code for autojob.next.advance

"""Semi-automatically advance workflows.

Examples:
    Programmatically,

    .. code-block:: python

        from pathlib import Path

        from autojob.advance.advance import advance

        advance(dir_name=Path.cwd())

    From the command-line,

    .. code-block:: console

        autojob advance
"""

import logging
from pathlib import Path
from typing import TYPE_CHECKING

from pydantic import TypeAdapter

from autojob import SETTINGS
from autojob.bases.task_base import TaskOutcome
from autojob.harvest.harvest import harvest
from autojob.next import FILE_SIZE_LIMIT
from autojob.next import create_next_step
from autojob.next import finalize_task
from autojob.workflow import Step
from autojob.workflow import Workflow

if TYPE_CHECKING:
    from autojob.bases.task_base import TaskBase

logger = logging.getLogger(__name__)


[docs] def get_next_steps(task: "TaskBase", study_dir: Path) -> list[str]: """Get the UUIDs of the next steps in the workflow. Args: task: The previous task. study_dir: The root directory of the study containing the completed task. Returns: A list of strings representing the steps that should be started since `task` has completed. If the task is to be restarted, the list will only contain a single string: the workflow step ID of the previous task. """ logger.debug(f"Determining next steps for {task.task_metadata.task_id}") wfw = Workflow.from_directory(study_dir) nodes = iter(wfw.static_order()) try: # ! For backwards-compatibility, assume only the first task (a # ! relaxation Calculation) can fail; if it does, restart next_steps = [next(nodes)] except StopIteration: next_steps = [] num_next_steps = len(next_steps) logger.debug( f"{num_next_steps} next step{'' if num_next_steps == 1 else 's'}" ) return next_steps
# TODO: Expand templating outside of metadata by flattening task # TODO: support/implement setting the "label" key # ! Note that all parametrizations of a given step are currently # ! initiated at once # TODO: Support calc/slurm_mods
[docs] def advance( *, src: Path, file_size_limit: float = FILE_SIZE_LIMIT, submit: bool = True, name_template: str | None = None, ) -> list[tuple["TaskBase", Path]]: """Advance to the next task in the workflow. Args: src: The directory of the completed calculation. file_size_limit: A float specifying the threshold above which files of this size will be deleted. Defaults to FILE_SIZE_LIMIT. submit: Whether or not to submit the new job after creation. Defaults to True. name_template: A template to use for the directory name. Defaults to None in which case the task ID will be used. Raises: RuntimeError: Task failed! Cannot advance to next step! Returns: A list of tuples (task_i, path_i) where task_i is the ith created task and path_i is the Path representing the directory containing the ith created task. """ logger.debug(f"Advancing task in {src}") study_dir = src.parent.parent completed_task = next(iter(harvest(src, use_cache=True))) if ( completed_task.task_outputs is None or completed_task.task_outputs.outcome != TaskOutcome.SUCCESS ): msg = "Task failed! Cannot advance to next step!" raise RuntimeError(msg) finalize_task( src=src, task=completed_task, record_task=True, ) next_steps = get_next_steps(completed_task, study_dir) params = Path(SETTINGS.PARAMETRIZATION_FILE) with params.open(mode="r", encoding="utf-8") as file: steps = TypeAdapter(dict[str, Step]).validate_json(file.read()) tasks_and_dirs: list[tuple[TaskBase, Path]] = [] for step in next_steps: new_tasks_and_dirs = create_next_step( src=src, step=steps[step], previous_task=completed_task, file_size_limit=file_size_limit, submit=submit, restart=False, name_template=name_template, ) tasks_and_dirs.extend(new_tasks_and_dirs) logger.debug(f"Successfully advanced from task in {src}") return tasks_and_dirs