"""Utilities for creating tasks from existing task directories."""
import json
import logging
from pathlib import Path
import shutil
import subprocess
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
import click
from autojob import SETTINGS
from autojob.bases.task_base import TASK_GROUP_FIELDS
from autojob.harvest.archive import archive_json
from autojob.hpc import get_scheduler_command
from autojob.plugins import get_task_class
from autojob.utils.files import copy_permissions_and_ownership
from autojob.utils.files import create_templated_dir_name
from autojob.utils.templates import substitute_placeholders
if TYPE_CHECKING:
from autojob.bases.task_base import TaskBase
from autojob.parametrizations import VariableReference
from autojob.workflow import Step
logger = logging.getLogger(__name__)
FILE_SIZE_LIMIT = 1e8 # 100 MB
# If this file is present in the task directory, the task will not be restarted
STOP_FILE = "autojob.stop"
[docs]
def finalize_task(
src: Path,
task: "TaskBase",
record_task: bool = False,
) -> None:
"""Archive a completed task and note its completion in the study record.
Args:
src: A Path object indicating in which directory to archive the task.
task: The task to finalize.
record_task: Whether or not to record the completion of the task in
the study record. Defaults to False.
"""
if record_task:
with Path(src.parents[1], SETTINGS.RECORD_FILE).open(
mode="a", encoding="utf-8"
) as file:
file.write(f"{task.task_metadata.task_id}\n")
archive_file = Path(src, SETTINGS.ARCHIVE_FILE)
if not archive_file.exists():
archive_json([task], archive_file)
[docs]
def substitute_context(
mods: dict[str, Any],
context: dict[str, Any],
) -> dict[str, Any]:
"""Substitute context values into formatted strings.
Args:
mods: A dictionary mapping parameter names to values. String values
will be subsituted according to context values.
context: A dictionary mapping variable names to their values.
Variables with names corresponding to template names will be
substituted.
Returns:
A copy of `mods` with templated values substituted for their variable
values.
"""
new_mods: dict[str, Any] = {}
for key, value in mods.items():
if isinstance(value, str):
new_mods[key] = substitute_placeholders(value, **context)
else:
new_mods[key] = value
return new_mods
[docs]
def initialize_task(
*,
task_class: str,
parametrization: "list[VariableReference[Any]]",
previous_task: "TaskBase",
restart: bool = False,
) -> "TaskBase":
"""Setup a new task according to a parametrization.
Args:
task_class: A string representing the fully qualified class name
of the type of task to be created.
parametrization: The parametrization for the new task.
previous_task: The previous task.
restart: Whether or not the task to be created is a restart of a
previous task. Defaults to False.
Note:
`parametrization` **must** specify all parameters which are to be
inherited from `previous_task`. Any parameters that are not set by
`parametrization` will assume their default values.
Returns:
The new TaskBase instance.
"""
task_type = get_task_class(task_class)
context = previous_task.model_dump(exclude_none=True)
task_shell = task_type().model_dump(exclude_none=True) # type: ignore[call-arg]
for ref in parametrization:
ref.set_input_value(context, task_shell)
# TODO: add previous_task to TaskMetadataBase.parents once implemented
task_id = context["task_metadata"]["task_id"]
task_shell["task_metadata"]["tags"].append(str(task_id))
if not restart:
task_group_id = context["task_metadata"]["task_group_id"]
task_shell["task_metadata"]["tags"].append(str(task_group_id))
new_task = task_type(**task_shell)
return new_task
def _write_task_group_metadata(dest: Path, task: "TaskBase") -> None:
task_group_metadata_file = Path(dest, SETTINGS.TASK_GROUP_METADATA_FILE)
task_group_metadata = task.task_metadata.model_dump(
mode="json", include=set(TASK_GROUP_FIELDS)
)
task_group_metadata["tasks"] = [str(task.task_metadata.task_id)]
with task_group_metadata_file.open(mode="w", encoding="utf-8") as file:
json.dump(task_group_metadata, file, indent=4)
# TODO: Expand templating outside of metadata by flattening task
[docs]
def create_task_group_tree(
task: "TaskBase",
dest: Path,
*,
src: Path | None = None,
name_template: str | None = None,
) -> Path:
"""Create a new task group directory.
In addition to directory creation, this method will create a task group
metadata file and copy directory permissions and ownership to the new
directory.
Args:
task: The new task for which the task group directory will be made.
dest: The directory in which the new task group directory will be
created.
src: The directory of the completed task. Defaults to None in which
case permissions and ownership are not set.
name_template: A template to use for the directory name. Defaults to
None in which case the task group ID will be used.
Raises:
ValueError: Cannot create new task group without task group ID.
Returns:
The path to the newly created task group directory.
"""
if name_template:
context = task.task_metadata.model_dump(exclude_none=True)
context["structure"] = Path(task.task_inputs.atoms_filename).stem
conflict_dir = src.parent if src else dest
dir_name = create_templated_dir_name(
name_template, conflict_dir, context
)
else:
if task.task_metadata.task_group_id is None:
msg = "Cannot create new task group without task group ID."
raise ValueError(msg)
dir_name = str(task.task_metadata.task_group_id)
task_group = Path(dest, dir_name)
task_group.mkdir(parents=True)
if src:
copy_permissions_and_ownership(src.parent, task_group)
_write_task_group_metadata(task_group, task)
return task_group
[docs]
def carry_over_files(
*,
previous_task_src: Path,
new_task_dest: Path,
new_task: "TaskBase",
files_to_carry_over: list[str] | None = None,
) -> None:
"""Copy files from a previous task directory to a new task directory.
Args:
previous_task_src: A Path object representing the directory of the
completed task.
new_task_dest: A Path object representing the destination directory of
the new task.
new_task: The new task.
files_to_carry_over: A list of strings indicating the files to
carry over from the previous task. Defaults to an empty list.
"""
files_to_carry_over = files_to_carry_over or []
for file in files_to_carry_over:
try:
p = Path(previous_task_src, file)
copier = shutil.copytree if p.is_dir() else shutil.copy
copier(
src=Path(previous_task_src, file),
dst=Path(new_task_dest, file),
)
logger.info(
"Successfully copied %s to new task directory for task: %s",
file,
str(new_task.task_metadata.task_id),
)
except FileNotFoundError:
logger.warning(
"Unable to copy %s to new task directory for task: %s",
file,
str(new_task.task_metadata.task_id),
)
# TODO: Expand templating outside of metadata by flattening task
# TODO: Check origin directory when checking for directory uniqueness
# TODO: Add examples with(out) optional args
[docs]
def create_task_tree(
task: "TaskBase",
dest: Path,
*,
src: Path | None = None,
files_to_carry_over: list[str] | None = None,
name_template: str | None = None,
) -> Path:
"""Create a new task directory.
Args:
task: The new task for which the directory will be made.
dest: A Path representing the directory in which to create the
directory of the new task.
src: The source directory for the new task. Defaults to None in which
case no files will be carried over.
files_to_carry_over: A list of strings indicating the files to
carry over from the previous task. Defaults to None.
name_template: A template to use for the directory name. Defaults to
None in which case the task ID will be used.
Returns:
The path to the newly created task directory.
"""
if name_template:
context = task.task_metadata.model_dump(exclude_none=True)
context["structure"] = Path(task.task_inputs.atoms_filename).stem
conflict_dir = src.parent if src else dest
dir_name = create_templated_dir_name(
name_template, conflict_dir, context
)
else:
dir_name = str(task.task_metadata.task_id)
task_dest = Path(dest, dir_name)
task_dest.mkdir(parents=True)
if src:
copy_permissions_and_ownership(src, task_dest)
carry_over_files(
previous_task_src=src,
new_task_dest=task_dest,
new_task=task,
files_to_carry_over=files_to_carry_over,
)
_ = task.write_inputs(dest=task_dest)
return task_dest
[docs]
def add_item_to_parent(
item_id: str,
metadata_file: Path,
key: Literal["tasks", "task_groups"],
) -> None:
"""Add the given ID to the details.json of its parent.
Args:
item_id: The ID to add.
metadata_file: The path to the metadata file of the parent to which to
add the item ID.
key: The key to which to add. Either ``"tasks"`` or ``"task_groups"``.
"""
logger.debug(f"Adding {item_id} to {metadata_file}")
if not metadata_file.exists():
logger.warning(f"Unable to add item to metadata file: {metadata_file}")
return
with metadata_file.open(mode="r", encoding="utf-8") as file:
metadata = json.load(file)
metadata[key].append(item_id)
with metadata_file.open(mode="w", encoding="utf-8") as file:
json.dump(metadata, file, indent=4)
logger.debug(f"Successfully added {item_id} to {metadata_file}")
[docs]
def clean_up_task(
old_job: Path,
*,
file_size_limit: float = FILE_SIZE_LIMIT,
files_to_delete: list[str] | None = None,
) -> None:
"""Deletes large files from copied job.
Args:
old_job: A Path object representing the directory holding the
large files to be deleted.
file_size_limit: A float specifying the file size in bytes over which
files will be deleted. Defaults to ``FILE_SIZE_LIMIT``.
files_to_delete: A list of strings specifying files to delete.
Defaults to an empty list.
"""
files_to_delete = files_to_delete or []
for path in old_job.iterdir():
if path.name in files_to_delete or (
not path.is_symlink() and path.stat().st_size >= file_size_limit
):
file = path.resolve()
file.unlink()
logger.info(f"{'/'.join(file.parts[-5:])} deleted")
[docs]
def submit_new_task(new_task: Path, task_script: str) -> None:
"""Submit the newly created job to the Slurm scheduler.
Args:
new_task: A Path to the new task's directory.
task_script: The name of the task script.
"""
logger.info(f"Submitting task in {new_task}")
cmd = get_scheduler_command(SETTINGS.SCHEDULER)
output = subprocess.check_output(
[cmd, task_script],
cwd=new_task,
encoding="utf-8",
)
output = output.strip("\n")
job_name = "/".join(new_task.parts[-4:])
click.echo(f"{output} ({job_name})")
logger.info("%s (%s)", output, job_name)
[docs]
def create_next_step(
*,
src: Path,
step: "Step",
previous_task: "TaskBase",
files_to_carry_over: list[str] | None = None,
file_size_limit: float = FILE_SIZE_LIMIT,
submit: bool = True,
restart: bool = False,
# ? Consider setting at task_group/study level from file
name_template: str | None = None,
) -> list[tuple["TaskBase", Path]]:
"""Initiate a step by creating all tasks that are ready to start.
Args:
src: The source directory for the new tasks. That is, the directory
containing the recently completed task.
step: The Step to initiate.
previous_task: The previous task.
files_to_carry_over: A list of strings specifying files to copy from
the task directory of ``previous_task`` to that of the new task.
file_size_limit: A float specifying the threshold above which files
of this size will be deleted from the source directory. Defaults to
``FILE_SIZE_LIMIT``.
submit: Whether or not to submit the new TaskBases after creation. Defaults
to True.
restart: Whether or not the task to be created is a restart of a
previous task. Defaults to False.
name_template: A template to use for the directory name. Defaults to
None in which case the task ID will be used.
Returns:
A list of 2-tuples (task, path) where task is the new TaskBase instance
and path is the path in which it was dumped. For new task groups, `path`
will point to the new task group directory.
"""
files_to_carry_over = files_to_carry_over or []
tasks_and_dirs: list[tuple[TaskBase, Path]] = []
for parametrization in step.parametrizations:
new_task = initialize_task(
task_class=step.task_class,
parametrization=parametrization,
previous_task=previous_task,
restart=restart,
)
with TemporaryDirectory() as tmpdir:
if restart:
task_dest = Path(tmpdir)
item_id = str(new_task.task_metadata.task_id)
final_dest = src.parent
metadata_file = Path(
final_dest, SETTINGS.TASK_GROUP_METADATA_FILE
)
key: Literal["tasks", "task_groups"] = "tasks"
else:
task_dest = create_task_group_tree(
src=src,
task=new_task,
dest=Path(tmpdir),
name_template=name_template,
)
item_id = str(new_task.task_metadata.task_group_id)
final_dest = src.parent.parent
metadata_file = Path(final_dest, SETTINGS.STUDY_METADATA_FILE)
key = "task_groups"
new_task_dir = create_task_tree(
src=src,
task=new_task,
dest=task_dest,
files_to_carry_over=files_to_carry_over,
name_template=name_template,
)
new_tree = new_task_dir if restart else new_task_dir.parent
final_tree_name = Path(final_dest, new_tree.name)
shutil.copytree(src=new_tree, dst=final_tree_name)
add_item_to_parent(
item_id=item_id,
metadata_file=metadata_file,
key=key,
)
clean_up_task(
old_job=src,
file_size_limit=file_size_limit,
files_to_delete=previous_task.task_inputs.files_to_delete,
)
tasks_and_dirs.append((new_task, final_tree_name))
logger.debug(
"New task%s created %s",
"" if restart else " group",
"/".join(final_tree_name.parts[-4:]),
)
if submit:
submit_new_task(
new_task=final_tree_name,
task_script=new_task.task_inputs.task_script,
)
return tasks_and_dirs