"""Restart a completed task.
Examples:
Programmatically,
.. code-block:: python
from pathlib import Path
from autojob.next.restart import restart
restart(dir_name=Path.cwd())
From the command-line,
.. code-block:: console
autojob restart
"""
from collections.abc import Iterable
import logging
import math
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
import click
from autojob.harvest.harvest import harvest
from autojob.next import FILE_SIZE_LIMIT
from autojob.next import STOP_FILE
from autojob.next import create_next_step
from autojob.next import finalize_task
from autojob.next import substitute_context
from autojob.parametrizations import VariableReference
from autojob.parametrizations import create_parametrization
from autojob.utils.cli import MemoryFloat
from autojob.utils.cli import configure_settings
from autojob.utils.cli import mods_to_dict
from autojob.utils.files import find_finished_jobs
from autojob.workflow import Step
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from autojob.bases.task_base import TaskBase
_to_exclude_on_restart = ["task_id", "uri", "date_created", "last_updated"]
[docs]
def restart(
src: str | Path | None = None,
*,
calc_mods: dict[str, Any] | None = None,
sched_mods: dict[str, Any] | None = None,
file_size_limit: float = FILE_SIZE_LIMIT,
submit: bool = True,
auto_restart: bool = False, # noqa: ARG001
files_to_carry_over: Iterable[str] | None = None, # noqa: ARG001
) -> tuple["TaskBase", Path]:
"""Advance to the next task in the workflow.
Args:
src: The directory of the completed task. Defaults to the current
working directory.
calc_mods: A dictionary mapping calculator parameters to values that
should be used to overwrite the existing parameters.
sched_mods: A dictionary mapping Slurm options to values that
should be used to overwrite the existing parameters.
file_size_limit: A float specifying the threshold above which files
of this size will be deleted. Defaults to FILE_SIZE_LIMIT.
submit: Whether or not to submit the new job after creation. Defaults
to True.
auto_restart: Whether or not to add logic to automatically restart the
calculation after the calculation has converged.
files_to_carry_over: A list of strings indicating which files to carry
over from the old job directory to the new job directory. Defaults
to None, in which case, the files to copy are determined from the
previous task.
Returns:
A list of tuples (task_i, path_i) where task_i is the ith created task
and path_i is the Path representing the directory containing the ith
created task.
Warning:
When specifying `sched_mods`, be wary of setting mutually exclusive
scheduler parameters (e.g, `mem` and `mem_per_cpu` or `cores` and
`cores_per_node`). For example, if the `mem` parameter is set and one
wants to set the `mem_per_cpu` parameter, set the `mem` key to `Unset`
in `sched_mods` in addition to setting the `mem_per_cpu` key.
"""
src = Path(src) if src else Path.cwd()
logger.debug(f"Restarting task in {src}")
calc_mods = calc_mods or {}
sched_mods = sched_mods or {}
completed_task = next(iter(harvest(src, use_cache=True)))
finalize_task(
src=src,
task=completed_task,
# TODO: must implement record keeping first, then expose as arg
record_task=False,
)
# Template scheduler inputs with task metadata and structure name
context = completed_task.task_metadata.model_dump(exclude_none=True)
context["structure"] = (
Path(completed_task.task_inputs.atoms_filename).stem or "{structure}"
)
sched_mods = substitute_context(sched_mods, context)
parametrization = create_parametrization(
completed_task,
calc_mods=calc_mods,
sched_mods=sched_mods,
exclude_metadata=_to_exclude_on_restart,
)
# Set new input atoms to completed task's output atoms
parametrization.append(
VariableReference(
set_path=["task_inputs", "atoms"],
constant=completed_task.task_outputs.atoms,
)
)
step = Step(
workflow_step_id=completed_task.task_metadata.workflow_step_id,
task_class=completed_task.task_metadata.task_class,
progression="independent",
parametrizations=[parametrization],
)
new_task, new_task_dir = next(
iter(
create_next_step(
src=src,
step=step,
previous_task=completed_task,
file_size_limit=file_size_limit,
submit=submit,
restart=True,
)
)
)
logger.debug(f"Successfully created restart task from {src}")
return new_task, new_task_dir
# TODO: expose all TaskInputs as CLI options
@click.command(
"restart",
epilog="""
Warning: When specifying `sched_mods`, be wary of setting mutually exclusive
scheduler parameters (e.g, mem and mem_per_cpu or cores and
cores_per_node). For example, if the "mem" parameter is set and one
wants to set the mem_per_cpu parameter, set the "mem" key to None in
`sched_mods` in addition to setting the "mem_per_cpu" key.
But this is unacceptable:
Vasp(atoms)
-------------------------------
EXAMPLES
-------------------------------
1. Submit the new job upon creation.
autojob restart -S
2. Copy the WAVECAR & CHGCAR, submit the new job upon creation, print
out all messages.
\b
autojob restart --carry-over WAVECAR --carry-over CHGCAR -S -vv
3. Submit the new job upon creation, enable auto-restart, print out all
messages, delete all files 1 gigabyte or larger, reconfigure the calculator
with ediff=1e-8.
\b
autojob restart -Savv --file-size-limit 1GB -c "ediff=1e-8"
4. Submit the new job upon creation, set the job name to the name of the
structure file with the suffix "-vib".
\b
autojob restart -S --sched-mod="job-name={structure}-vib"
""",
context_settings={"help_option_names": ["-h", "--help"]},
)
@click.option(
"-v",
"--verbose",
"verbosity",
default=0,
count=True,
help="Controls the verbosity. 0: Show messages of level warning and "
"higher. 1: Show messages of level info and higher. 2: Show all messages"
"-useful for debugging.",
show_default=True,
)
@click.option(
"-S",
"--submit/--no-submit",
default=False,
help="Whether or not to submit the newly created job.",
show_default=True,
)
@click.option(
"--file-size-limit",
default=math.inf,
help="Specify the file size above which all files in the old job larger "
"than this will be deleted. Integers will be interpreted in bytes. The "
"suffixes, KB, MB, and GB, will cause the value to be interpreted as "
"kilobytes, megabytes, and gigabytes, respectively. No limit is set by "
"default.",
show_default=True,
type=MemoryFloat(),
metavar="MEM",
)
@click.option(
"-a",
"--auto-restart/--no-auto-restart",
default=False,
help="Whether or not to add logic to vasp.sh to automatically resubmit "
"a job on time out.",
show_default=True,
)
@click.option(
"-c",
"--calc-mod",
"calc_mods",
default={},
multiple=True,
callback=mods_to_dict,
help="Specify modifications to calculator parameters. Modifications "
"must be passed as quoted parameter-value strings separated by an equals "
'sign (e.g., "parameter=value"). Values (even strings) must be specified '
"without additional quotations as they will be parsed into native Python "
"types. Values can only be parsed into integers, floats, strings, "
"booleans, or None. This option can be repeated.",
metavar="MOD",
)
@click.option(
"-L",
"--sched-mod",
"sched_mods",
default={},
multiple=True,
callback=mods_to_dict,
help="Specify modifications to scheduler parameters. Modifications "
"must be passed as quoted parameter-value strings separated by an equals "
'sign (e.g., "parameter=value"). Values must be passed exactly as they '
'should be passed to the scheduler (e.g., "mem=10GB"), and format strings '
"may be used to substitute select parameters. Variables that may be "
"substituted include the structure filename, the study group, study, "
"calculation, or job ID. However, the use of placeholders is currently"
"only supported for the job-name SLURM option. This option may be "
"repeated.",
metavar="MOD",
)
@click.option(
"--config",
default={},
multiple=True,
callback=mods_to_dict,
help="Configure autojob configuration settings (e.g., "
'--config="log_file=autojob.log"). This option can be repeated.',
)
@click.option(
"--carry-over",
"files_to_carry_over",
default=(),
help="Indicate a file to copy from the directory of the completed job to "
"the directory of the new job. This option can be repeated.",
multiple=True,
show_default=True,
)
@click.option(
"-p",
"--path",
"paths",
default=(Path().cwd(),),
help="The path from which to create new job directories. This option can "
"be repeated. Defaults to the current working directory.",
multiple=True,
type=click.Path(path_type=Path),
)
@click.option(
"-R",
"--recursive/---no-recursive",
default=False,
help="Whether to search all subdirectories for jobs to restart in "
"addition to those specified by the --path option.",
show_default=True,
)
def main(
*,
paths: tuple[Path, ...],
verbosity: int,
submit: bool,
file_size_limit: float,
auto_restart: bool,
recursive: bool,
calc_mods: dict[str, Any],
sched_mods: dict[str, Any],
files_to_carry_over: tuple[str, ...],
config: dict[str, Any],
) -> None:
"""Restart a task."""
match verbosity:
case 0:
level = logging.WARNING
case 1:
level = logging.INFO
case _:
level = logging.DEBUG
logging.basicConfig(level=level)
configure_settings(config)
if recursive:
sub_dir_paths: set[Path] = set()
for path in paths:
sub_dir_paths.union(find_finished_jobs(path=path))
paths = tuple(sub_dir_paths)
# TODO: Add whitelist/blacklist filtering
for old_job in paths:
if Path(old_job).joinpath(STOP_FILE).exists():
click.echo(f"Stop file found. Aborting restart ({old_job})")
else:
new_task, new_task_dir = restart(
src=old_job,
calc_mods=calc_mods,
sched_mods=sched_mods,
file_size_limit=file_size_limit,
submit=submit,
auto_restart=auto_restart,
files_to_carry_over=files_to_carry_over or None,
)
task_id = str(new_task.task_metadata.task_id)
click.echo(
f"New job created ({task_id}) {'/'.join(new_task_dir.parts[-4:])}"
)