Source code for autojob.next.restart

"""Restart a completed task.

Examples:
    Programmatically,

    .. code-block:: python

        from pathlib import Path

        from autojob.next.restart import restart

        restart(dir_name=Path.cwd())

    From the command-line,

    .. code-block:: console

        autojob restart
"""

from collections.abc import Iterable
import logging
import math
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

import click

from autojob.harvest.harvest import harvest
from autojob.next import FILE_SIZE_LIMIT
from autojob.next import STOP_FILE
from autojob.next import create_next_step
from autojob.next import finalize_task
from autojob.next import substitute_context
from autojob.parametrizations import VariableReference
from autojob.parametrizations import create_parametrization
from autojob.utils.cli import MemoryFloat
from autojob.utils.cli import configure_settings
from autojob.utils.cli import mods_to_dict
from autojob.utils.files import find_finished_jobs
from autojob.workflow import Step

logger = logging.getLogger(__name__)


if TYPE_CHECKING:
    from autojob.bases.task_base import TaskBase

_to_exclude_on_restart = ["task_id", "uri", "date_created", "last_updated"]


[docs] def restart( src: str | Path | None = None, *, calc_mods: dict[str, Any] | None = None, sched_mods: dict[str, Any] | None = None, file_size_limit: float = FILE_SIZE_LIMIT, submit: bool = True, auto_restart: bool = False, # noqa: ARG001 files_to_carry_over: Iterable[str] | None = None, # noqa: ARG001 ) -> tuple["TaskBase", Path]: """Advance to the next task in the workflow. Args: src: The directory of the completed task. Defaults to the current working directory. calc_mods: A dictionary mapping calculator parameters to values that should be used to overwrite the existing parameters. sched_mods: A dictionary mapping Slurm options to values that should be used to overwrite the existing parameters. file_size_limit: A float specifying the threshold above which files of this size will be deleted. Defaults to FILE_SIZE_LIMIT. submit: Whether or not to submit the new job after creation. Defaults to True. auto_restart: Whether or not to add logic to automatically restart the calculation after the calculation has converged. files_to_carry_over: A list of strings indicating which files to carry over from the old job directory to the new job directory. Defaults to None, in which case, the files to copy are determined from the previous task. Returns: A list of tuples (task_i, path_i) where task_i is the ith created task and path_i is the Path representing the directory containing the ith created task. Warning: When specifying `sched_mods`, be wary of setting mutually exclusive scheduler parameters (e.g, `mem` and `mem_per_cpu` or `cores` and `cores_per_node`). For example, if the `mem` parameter is set and one wants to set the `mem_per_cpu` parameter, set the `mem` key to `Unset` in `sched_mods` in addition to setting the `mem_per_cpu` key. """ src = Path(src) if src else Path.cwd() logger.debug(f"Restarting task in {src}") calc_mods = calc_mods or {} sched_mods = sched_mods or {} completed_task = next(iter(harvest(src, use_cache=True))) finalize_task( src=src, task=completed_task, # TODO: must implement record keeping first, then expose as arg record_task=False, ) # Template scheduler inputs with task metadata and structure name context = completed_task.task_metadata.model_dump(exclude_none=True) context["structure"] = ( Path(completed_task.task_inputs.atoms_filename).stem or "{structure}" ) sched_mods = substitute_context(sched_mods, context) parametrization = create_parametrization( completed_task, calc_mods=calc_mods, sched_mods=sched_mods, exclude_metadata=_to_exclude_on_restart, ) # Set new input atoms to completed task's output atoms parametrization.append( VariableReference( set_path=["task_inputs", "atoms"], constant=completed_task.task_outputs.atoms, ) ) step = Step( workflow_step_id=completed_task.task_metadata.workflow_step_id, task_class=completed_task.task_metadata.task_class, progression="independent", parametrizations=[parametrization], ) new_task, new_task_dir = next( iter( create_next_step( src=src, step=step, previous_task=completed_task, file_size_limit=file_size_limit, submit=submit, restart=True, ) ) ) logger.debug(f"Successfully created restart task from {src}") return new_task, new_task_dir
# TODO: expose all TaskInputs as CLI options @click.command( "restart", epilog=""" Warning: When specifying `sched_mods`, be wary of setting mutually exclusive scheduler parameters (e.g, mem and mem_per_cpu or cores and cores_per_node). For example, if the "mem" parameter is set and one wants to set the mem_per_cpu parameter, set the "mem" key to None in `sched_mods` in addition to setting the "mem_per_cpu" key. But this is unacceptable: Vasp(atoms) ------------------------------- EXAMPLES ------------------------------- 1. Submit the new job upon creation. autojob restart -S 2. Copy the WAVECAR & CHGCAR, submit the new job upon creation, print out all messages. \b autojob restart --carry-over WAVECAR --carry-over CHGCAR -S -vv 3. Submit the new job upon creation, enable auto-restart, print out all messages, delete all files 1 gigabyte or larger, reconfigure the calculator with ediff=1e-8. \b autojob restart -Savv --file-size-limit 1GB -c "ediff=1e-8" 4. Submit the new job upon creation, set the job name to the name of the structure file with the suffix "-vib". \b autojob restart -S --sched-mod="job-name={structure}-vib" """, context_settings={"help_option_names": ["-h", "--help"]}, ) @click.option( "-v", "--verbose", "verbosity", default=0, count=True, help="Controls the verbosity. 0: Show messages of level warning and " "higher. 1: Show messages of level info and higher. 2: Show all messages" "-useful for debugging.", show_default=True, ) @click.option( "-S", "--submit/--no-submit", default=False, help="Whether or not to submit the newly created job.", show_default=True, ) @click.option( "--file-size-limit", default=math.inf, help="Specify the file size above which all files in the old job larger " "than this will be deleted. Integers will be interpreted in bytes. The " "suffixes, KB, MB, and GB, will cause the value to be interpreted as " "kilobytes, megabytes, and gigabytes, respectively. No limit is set by " "default.", show_default=True, type=MemoryFloat(), metavar="MEM", ) @click.option( "-a", "--auto-restart/--no-auto-restart", default=False, help="Whether or not to add logic to vasp.sh to automatically resubmit " "a job on time out.", show_default=True, ) @click.option( "-c", "--calc-mod", "calc_mods", default={}, multiple=True, callback=mods_to_dict, help="Specify modifications to calculator parameters. Modifications " "must be passed as quoted parameter-value strings separated by an equals " 'sign (e.g., "parameter=value"). Values (even strings) must be specified ' "without additional quotations as they will be parsed into native Python " "types. Values can only be parsed into integers, floats, strings, " "booleans, or None. This option can be repeated.", metavar="MOD", ) @click.option( "-L", "--sched-mod", "sched_mods", default={}, multiple=True, callback=mods_to_dict, help="Specify modifications to scheduler parameters. Modifications " "must be passed as quoted parameter-value strings separated by an equals " 'sign (e.g., "parameter=value"). Values must be passed exactly as they ' 'should be passed to the scheduler (e.g., "mem=10GB"), and format strings ' "may be used to substitute select parameters. Variables that may be " "substituted include the structure filename, the study group, study, " "calculation, or job ID. However, the use of placeholders is currently" "only supported for the job-name SLURM option. This option may be " "repeated.", metavar="MOD", ) @click.option( "--config", default={}, multiple=True, callback=mods_to_dict, help="Configure autojob configuration settings (e.g., " '--config="log_file=autojob.log"). This option can be repeated.', ) @click.option( "--carry-over", "files_to_carry_over", default=(), help="Indicate a file to copy from the directory of the completed job to " "the directory of the new job. This option can be repeated.", multiple=True, show_default=True, ) @click.option( "-p", "--path", "paths", default=(Path().cwd(),), help="The path from which to create new job directories. This option can " "be repeated. Defaults to the current working directory.", multiple=True, type=click.Path(path_type=Path), ) @click.option( "-R", "--recursive/---no-recursive", default=False, help="Whether to search all subdirectories for jobs to restart in " "addition to those specified by the --path option.", show_default=True, ) def main( *, paths: tuple[Path, ...], verbosity: int, submit: bool, file_size_limit: float, auto_restart: bool, recursive: bool, calc_mods: dict[str, Any], sched_mods: dict[str, Any], files_to_carry_over: tuple[str, ...], config: dict[str, Any], ) -> None: """Restart a task.""" match verbosity: case 0: level = logging.WARNING case 1: level = logging.INFO case _: level = logging.DEBUG logging.basicConfig(level=level) configure_settings(config) if recursive: sub_dir_paths: set[Path] = set() for path in paths: sub_dir_paths.union(find_finished_jobs(path=path)) paths = tuple(sub_dir_paths) # TODO: Add whitelist/blacklist filtering for old_job in paths: if Path(old_job).joinpath(STOP_FILE).exists(): click.echo(f"Stop file found. Aborting restart ({old_job})") else: new_task, new_task_dir = restart( src=old_job, calc_mods=calc_mods, sched_mods=sched_mods, file_size_limit=file_size_limit, submit=submit, auto_restart=auto_restart, files_to_carry_over=files_to_carry_over or None, ) task_id = str(new_task.task_metadata.task_id) click.echo( f"New job created ({task_id}) {'/'.join(new_task_dir.parts[-4:])}" )