Source code for autojob.next.advance

"""Semi-automatically advance workflows.

Examples:
    Programmatically,

    .. code-block:: python

        from pathlib import Path

        from autojob.advance.advance import advance

        advance(dir_name=Path.cwd())

    From the command-line,

    .. code-block:: console

        autojob advance
"""

import logging
import math
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

import click
from pydantic import TypeAdapter

from autojob import SETTINGS
from autojob.bases.task_base import TaskOutcome
from autojob.harvest.harvest import harvest
from autojob.next import FILE_SIZE_LIMIT
from autojob.next import STOP_FILE
from autojob.next import create_next_step
from autojob.next import finalize_task
from autojob.utils.cli import MemoryFloat
from autojob.utils.cli import configure_settings
from autojob.utils.cli import mods_to_dict
from autojob.utils.files import find_finished_jobs
from autojob.workflow import Step
from autojob.workflow import Workflow

if TYPE_CHECKING:
    from autojob.bases.task_base import TaskBase

logger = logging.getLogger(__name__)


[docs] def get_next_steps(task: "TaskBase", study_dir: Path) -> list[str]: """Get the UUIDs of the next steps in the workflow. Args: task: The previous task. study_dir: The root directory of the study containing the completed task. Returns: A list of strings representing the steps that should be started since `task` has completed. If the task is to be restarted, the list will only contain a single string: the workflow step ID of the previous task. """ logger.debug(f"Determining next steps for {task.task_metadata.task_id}") wfw = Workflow.from_directory(study_dir) nodes = iter(wfw.static_order()) try: # ! For backwards-compatibility, assume only the first task (a # ! relaxation Calculation) can fail; if it does, restart next_steps = [next(nodes)] except StopIteration: next_steps = [] num_next_steps = len(next_steps) logger.debug( f"{num_next_steps} next step{'' if num_next_steps == 1 else 's'}" ) return next_steps
# TODO: Expand templating outside of metadata by flattening task # TODO: support/implement setting the "label" key # ! Note that all parametrizations of a given step are currently # ! initiated at once # TODO: Support calc/slurm_mods
[docs] def advance( *, src: Path, file_size_limit: float = FILE_SIZE_LIMIT, submit: bool = True, ) -> list[tuple["TaskBase", Path]]: """Advance to the next task in the workflow. Args: src: The directory of the completed calculation. file_size_limit: A float specifying the threshold above which files of this size will be deleted. Defaults to FILE_SIZE_LIMIT. submit: Whether or not to submit the new job after creation. Defaults to True. Raises: RuntimeError: Task failed! Cannot advance to next step! Returns: A list of tuples (task_i, path_i) where task_i is the ith created task and path_i is the Path representing the directory containing the ith created task. """ logger.debug(f"Advancing task in {src}") study_dir = src.parent.parent completed_task = next(iter(harvest(src, use_cache=True))) if ( completed_task.task_outputs is None or completed_task.task_outputs.outcome != TaskOutcome.SUCCESS ): msg = "Task failed! Cannot advance to next step!" raise RuntimeError(msg) finalize_task( src=src, task=completed_task, record_task=True, ) next_steps = get_next_steps(completed_task, study_dir) params = Path(SETTINGS.PARAMETRIZATION_FILE) with params.open(mode="r", encoding="utf-8") as file: steps = TypeAdapter(dict[str, Step]).validate_json(file.read()) tasks_and_dirs: list[tuple[TaskBase, Path]] = [] for step in next_steps: new_tasks_and_dirs = create_next_step( src=src, step=steps[step], previous_task=completed_task, file_size_limit=file_size_limit, submit=submit, restart=False, ) tasks_and_dirs.extend(new_tasks_and_dirs) logger.debug(f"Successfully advanced from task in {src}") return tasks_and_dirs
# TODO: expose all TaskInputs as CLI options @click.command( "restart", epilog=""" ------------------------------- EXAMPLES ------------------------------- 1. Submit the next task upon creation. autojob advance -S """, context_settings={"help_option_names": ["-h", "--help"]}, ) @click.option( "-v", "--verbose", "verbosity", default=0, count=True, help="Controls the verbosity. 0: Show messages of level warning and " "higher. 1: Show messages of level info and higher. 2: Show all messages" "-useful for debugging.", show_default=True, ) @click.option( "-S", "--submit/--no-submit", default=False, help="Whether or not to submit the newly created job.", show_default=True, ) @click.option( "--file-size-limit", default=math.inf, help="Specify the file size above which all files in the old job larger " "than this will be deleted. Integers will be interpreted in bytes. The " "suffixes, KB, MB, and GB, will cause the value to be interpreted as " "kilobytes, megabytes, and gigabytes, respectively.", show_default=True, type=MemoryFloat(), metavar="MEM", ) # @click.option( # "-a", # "--auto-restart/--no-auto-restart", # default=False, # help="Whether or not to add logic to vasp.sh to automatically resubmit " # "a job on time out. Defaults '--no-auto-restart'.", # show_default=True, # ) # @click.option( # "-c", # "--calc-mod", # "calc_mods", # default={}, # multiple=True, # callback=mods_to_dict, # help="Specify modifications to calculator parameters. Modifications " # "must be passed as quoted parameter-value strings separated by an equals " # 'sign (e.g., "parameter=value"). Values (even strings) must be specified ' # "without additional quotations as they will be parsed into native Python " # "types. Values can only be parsed into integers, floats, strings, " # "booleans, or None. This option can be repeated.", # show_default=True, # ) # @click.option( # "-L", # "--sched-mod", # "sched_mods", # default={}, # multiple=True, # callback=mods_to_dict, # help="Specify modifications to scheduler parameters. Modifications " # "must be passed as quoted parameter-value strings separated by an equals " # 'sign (e.g., "parameter=value"). Values must be passed exactly as they ' # 'should be passed to the scheduler (e.g., "mem=10GB"), and format strings ' # "may be used to substitute select parameters. Variables that may be " # "substituted include the structure filename, the study group, study, " # "calculation, or job ID. ; however, the use of placeholders is currently" # "only supported for the job-name SLURM option. Note that only one of 'mem'" # "and 'mem-per-cpu' can be set. This option may be repeated.", # show_default=True, # ) @click.option( "--config", default={}, multiple=True, callback=mods_to_dict, help="Configure autojob parameters (e.g., " '--config="log_file=autojob.log"). This option can be repeated.', ) # @click.option( # "--carry-over", # "files_to_carry_over", # default=(), # help="Indicate a file to copy from the directory of the completed job to " # "the directory of the new job. This option can be repeated.", # multiple=True, # show_default=True, # ) @click.option( "-p", "--path", "paths", default=(Path().cwd(),), help="The path from which to create new job directories. This option can " "be repeated. Defaults to the current working directory.", multiple=True, type=click.Path(path_type=Path), ) @click.option( "-R", "--recursive/---no-recursive", default=False, help="Whether to search all subdirectories for jobs to restart in " "addition to those specified by the --path option.", show_default=True, ) def main( *, paths: tuple[Path, ...], verbosity: int, submit: bool, file_size_limit: float, recursive: bool, # auto_restart: bool, # calc_mods: dict[str, Any], # sched_mods: dict[str, Any], # files_to_carry_over: tuple[str, ...], config: dict[str, Any], ) -> None: """Advance to the next task in a workflow.""" match verbosity: case 0: level = logging.WARNING case 1: level = logging.INFO case _: level = logging.DEBUG logging.basicConfig(level=level) configure_settings(config) if recursive: sub_dir_paths: set[Path] = set() for path in paths: sub_dir_paths.union(find_finished_jobs(path=path)) paths = tuple(sub_dir_paths) # TODO: Add whitelist/blacklist filtering for old_job in paths: if Path(old_job).joinpath(STOP_FILE).exists(): click.echo(f"Stop file found. Aborting restart ({old_job})") else: tasks_and_dirs = advance( src=old_job, file_size_limit=file_size_limit, submit=submit, # calc_mods=calc_mods, # sched_mods=sched_mods, # auto_restart=auto_restart, # files_to_carry_over=files_to_carry_over or None, ) for new_task, new_task_dir in tasks_and_dirs: task_id = str(new_task.task_metadata.task_id) click.echo( f"New job created ({task_id}) {'/'.join(new_task_dir.parts[-4:])}" )