"""Semi-automatically advance workflows.
Examples:
Programmatically,
.. code-block:: python
from pathlib import Path
from autojob.advance.advance import advance
advance(dir_name=Path.cwd())
From the command-line,
.. code-block:: console
autojob advance
"""
import logging
import math
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
import click
from pydantic import TypeAdapter
from autojob import SETTINGS
from autojob.bases.task_base import TaskOutcome
from autojob.harvest.harvest import harvest
from autojob.next import FILE_SIZE_LIMIT
from autojob.next import STOP_FILE
from autojob.next import create_next_step
from autojob.next import finalize_task
from autojob.utils.cli import MemoryFloat
from autojob.utils.cli import configure_settings
from autojob.utils.cli import mods_to_dict
from autojob.utils.files import find_finished_jobs
from autojob.workflow import Step
from autojob.workflow import Workflow
if TYPE_CHECKING:
from autojob.bases.task_base import TaskBase
logger = logging.getLogger(__name__)
[docs]
def get_next_steps(task: "TaskBase", study_dir: Path) -> list[str]:
"""Get the UUIDs of the next steps in the workflow.
Args:
task: The previous task.
study_dir: The root directory of the study containing the completed
task.
Returns:
A list of strings representing the steps that should be started since
`task` has completed. If the task is to be restarted, the list will
only contain a single string: the workflow step ID of the previous
task.
"""
logger.debug(f"Determining next steps for {task.task_metadata.task_id}")
wfw = Workflow.from_directory(study_dir)
nodes = iter(wfw.static_order())
try:
# ! For backwards-compatibility, assume only the first task (a
# ! relaxation Calculation) can fail; if it does, restart
next_steps = [next(nodes)]
except StopIteration:
next_steps = []
num_next_steps = len(next_steps)
logger.debug(
f"{num_next_steps} next step{'' if num_next_steps == 1 else 's'}"
)
return next_steps
# TODO: Expand templating outside of metadata by flattening task
# TODO: support/implement setting the "label" key
# ! Note that all parametrizations of a given step are currently
# ! initiated at once
# TODO: Support calc/slurm_mods
[docs]
def advance(
*,
src: Path,
file_size_limit: float = FILE_SIZE_LIMIT,
submit: bool = True,
) -> list[tuple["TaskBase", Path]]:
"""Advance to the next task in the workflow.
Args:
src: The directory of the completed calculation.
file_size_limit: A float specifying the threshold above which files
of this size will be deleted. Defaults to FILE_SIZE_LIMIT.
submit: Whether or not to submit the new job after creation. Defaults
to True.
Raises:
RuntimeError: Task failed! Cannot advance to next step!
Returns:
A list of tuples (task_i, path_i) where task_i is the ith created task
and path_i is the Path representing the directory containing the ith
created task.
"""
logger.debug(f"Advancing task in {src}")
study_dir = src.parent.parent
completed_task = next(iter(harvest(src, use_cache=True)))
if (
completed_task.task_outputs is None
or completed_task.task_outputs.outcome != TaskOutcome.SUCCESS
):
msg = "Task failed! Cannot advance to next step!"
raise RuntimeError(msg)
finalize_task(
src=src,
task=completed_task,
record_task=True,
)
next_steps = get_next_steps(completed_task, study_dir)
params = Path(SETTINGS.PARAMETRIZATION_FILE)
with params.open(mode="r", encoding="utf-8") as file:
steps = TypeAdapter(dict[str, Step]).validate_json(file.read())
tasks_and_dirs: list[tuple[TaskBase, Path]] = []
for step in next_steps:
new_tasks_and_dirs = create_next_step(
src=src,
step=steps[step],
previous_task=completed_task,
file_size_limit=file_size_limit,
submit=submit,
restart=False,
)
tasks_and_dirs.extend(new_tasks_and_dirs)
logger.debug(f"Successfully advanced from task in {src}")
return tasks_and_dirs
# TODO: expose all TaskInputs as CLI options
@click.command(
"restart",
epilog="""
-------------------------------
EXAMPLES
-------------------------------
1. Submit the next task upon creation.
autojob advance -S
""",
context_settings={"help_option_names": ["-h", "--help"]},
)
@click.option(
"-v",
"--verbose",
"verbosity",
default=0,
count=True,
help="Controls the verbosity. 0: Show messages of level warning and "
"higher. 1: Show messages of level info and higher. 2: Show all messages"
"-useful for debugging.",
show_default=True,
)
@click.option(
"-S",
"--submit/--no-submit",
default=False,
help="Whether or not to submit the newly created job.",
show_default=True,
)
@click.option(
"--file-size-limit",
default=math.inf,
help="Specify the file size above which all files in the old job larger "
"than this will be deleted. Integers will be interpreted in bytes. The "
"suffixes, KB, MB, and GB, will cause the value to be interpreted as "
"kilobytes, megabytes, and gigabytes, respectively.",
show_default=True,
type=MemoryFloat(),
metavar="MEM",
)
# @click.option(
# "-a",
# "--auto-restart/--no-auto-restart",
# default=False,
# help="Whether or not to add logic to vasp.sh to automatically resubmit "
# "a job on time out. Defaults '--no-auto-restart'.",
# show_default=True,
# )
# @click.option(
# "-c",
# "--calc-mod",
# "calc_mods",
# default={},
# multiple=True,
# callback=mods_to_dict,
# help="Specify modifications to calculator parameters. Modifications "
# "must be passed as quoted parameter-value strings separated by an equals "
# 'sign (e.g., "parameter=value"). Values (even strings) must be specified '
# "without additional quotations as they will be parsed into native Python "
# "types. Values can only be parsed into integers, floats, strings, "
# "booleans, or None. This option can be repeated.",
# show_default=True,
# )
# @click.option(
# "-L",
# "--sched-mod",
# "sched_mods",
# default={},
# multiple=True,
# callback=mods_to_dict,
# help="Specify modifications to scheduler parameters. Modifications "
# "must be passed as quoted parameter-value strings separated by an equals "
# 'sign (e.g., "parameter=value"). Values must be passed exactly as they '
# 'should be passed to the scheduler (e.g., "mem=10GB"), and format strings '
# "may be used to substitute select parameters. Variables that may be "
# "substituted include the structure filename, the study group, study, "
# "calculation, or job ID. ; however, the use of placeholders is currently"
# "only supported for the job-name SLURM option. Note that only one of 'mem'"
# "and 'mem-per-cpu' can be set. This option may be repeated.",
# show_default=True,
# )
@click.option(
"--config",
default={},
multiple=True,
callback=mods_to_dict,
help="Configure autojob parameters (e.g., "
'--config="log_file=autojob.log"). This option can be repeated.',
)
# @click.option(
# "--carry-over",
# "files_to_carry_over",
# default=(),
# help="Indicate a file to copy from the directory of the completed job to "
# "the directory of the new job. This option can be repeated.",
# multiple=True,
# show_default=True,
# )
@click.option(
"-p",
"--path",
"paths",
default=(Path().cwd(),),
help="The path from which to create new job directories. This option can "
"be repeated. Defaults to the current working directory.",
multiple=True,
type=click.Path(path_type=Path),
)
@click.option(
"-R",
"--recursive/---no-recursive",
default=False,
help="Whether to search all subdirectories for jobs to restart in "
"addition to those specified by the --path option.",
show_default=True,
)
def main(
*,
paths: tuple[Path, ...],
verbosity: int,
submit: bool,
file_size_limit: float,
recursive: bool,
# auto_restart: bool,
# calc_mods: dict[str, Any],
# sched_mods: dict[str, Any],
# files_to_carry_over: tuple[str, ...],
config: dict[str, Any],
) -> None:
"""Advance to the next task in a workflow."""
match verbosity:
case 0:
level = logging.WARNING
case 1:
level = logging.INFO
case _:
level = logging.DEBUG
logging.basicConfig(level=level)
configure_settings(config)
if recursive:
sub_dir_paths: set[Path] = set()
for path in paths:
sub_dir_paths.union(find_finished_jobs(path=path))
paths = tuple(sub_dir_paths)
# TODO: Add whitelist/blacklist filtering
for old_job in paths:
if Path(old_job).joinpath(STOP_FILE).exists():
click.echo(f"Stop file found. Aborting restart ({old_job})")
else:
tasks_and_dirs = advance(
src=old_job,
file_size_limit=file_size_limit,
submit=submit,
# calc_mods=calc_mods,
# sched_mods=sched_mods,
# auto_restart=auto_restart,
# files_to_carry_over=files_to_carry_over or None,
)
for new_task, new_task_dir in tasks_and_dirs:
task_id = str(new_task.task_metadata.task_id)
click.echo(
f"New job created ({task_id}) {'/'.join(new_task_dir.parts[-4:])}"
)