"""Utilities for handling files and directories."""
import contextlib
from datetime import UTC
from datetime import datetime
from datetime import timedelta
import json
import logging
from pathlib import Path
import re
import shutil
import socket
import subprocess
from typing import Any
import jinja2
from autojob import SETTINGS
logger = logging.getLogger(__name__)
[docs]
def get_uri(dir_name: str | Path) -> str:
"""Return the URI path for a directory.
This allows files hosted on different file servers to have distinct
locations.
Adapted from Atomate2.
Arg:
dir_name: A directory name.
Returns:
Full URI path, e.g., "fileserver.host.com:/full/path/of/dir_name".
"""
fullpath = Path(dir_name).absolute()
hostname = socket.gethostname()
with contextlib.suppress(socket.gaierror, socket.herror):
hostname = socket.gethostbyaddr(hostname)[0]
return f"{hostname}:{fullpath}"
[docs]
def get_slurm_job_id(job_dir: Path) -> int:
"""Returns the SLURM job id for the job run in the directory "job_dir".
Args:
job_dir: The directory containing the slurm output file.
Raises:
FileNotFoundError: SLURM output file not found.
Returns:
The SLURM job id.
"""
slurm_re = re.compile(r"slurm-(\d+).out")
for path in sorted(job_dir.iterdir(), reverse=True):
match = slurm_re.fullmatch(path.name)
if match:
return int(match[1])
msg = f"No slurm output file found in {'/'.join(job_dir.parts[-4:])}"
raise FileNotFoundError(msg)
def _add_memory_to_stats(slurm_job_id: int, job_stats_file: Path) -> None:
logger.debug(f"Add memory to job stats file for Slurm job: {slurm_job_id}")
slurm_cmd = [
"/usr/bin/env",
"seff",
str(slurm_job_id),
]
try:
output = subprocess.check_output(slurm_cmd, text=True)
memory_re = re.compile(r"Memory Utilized: (?P<memory>\d+\.\d{2} GB)")
match = memory_re.search(output)
memory = match.group("memory")
with job_stats_file.open(mode="r", encoding="utf-8") as file:
job_stats = json.load(file)
job_stats["jobs"][0]["max_rss"] = memory
with job_stats_file.open(mode="w", encoding="utf-8") as file:
json.dump(job_stats, file, indent=4)
except (AttributeError, subprocess.CalledProcessError) as err:
msg = f"Unable to add memory to job stats file for Slurm job: {slurm_job_id}"
raise RuntimeError(msg) from err
logger.debug(
f"Successfully added memory to job stats file for Slurm job: {slurm_job_id}"
)
[docs]
def create_job_stats_file(slurm_job_id: int, job_dir: str | Path) -> Path:
"""Creates file containing statistics from completed Slurm job.
Args:
slurm_job_id: The Slurm job ID for the job.
job_dir: The job directory.
Raises:
RuntimeError: Unable to create job stats file.
Returns:
A Path to the file containing the job statistics.
"""
logger.debug(f"Creating job stats file for Slurm job: {slurm_job_id}")
job_stats_file = Path(job_dir).joinpath(SETTINGS.SCHEDULER_STATS_FILE)
slurm_cmd = [
"/usr/bin/env",
"sacct",
"--json",
f"--jobs={slurm_job_id}",
]
try:
output = subprocess.check_output(slurm_cmd, text=True)
with job_stats_file.open(mode="w", encoding="utf-8") as file:
_ = file.write(output)
except subprocess.CalledProcessError as err:
msg = (
f"Unable to create job stats file for job in directory: {job_dir}"
)
raise RuntimeError(msg) from err
_add_memory_to_stats(slurm_job_id, job_stats_file)
logger.debug(
f"Successfully created job stats file for Slurm job: {slurm_job_id}"
)
return job_stats_file
[docs]
def find_study_group_dirs(path: Path | None = None) -> list[Path]:
"""Find all study group directories in the directory tree below ``path``.
Args:
path: Top level directory to be searched. Defaults to current working
directory.
"""
path = path or Path().cwd()
filename = SETTINGS.STUDY_GROUP_METADATA_FILE
return list({f.parent for f in path.rglob(filename)})
[docs]
def find_study_dirs(path: Path | None = None) -> list[Path]:
"""Find all study directories in the directory tree below ``path``.
Args:
path: Top level directory to be searched. Defaults to current working
directory.
"""
path = path or Path().cwd()
filename = SETTINGS.STUDY_METADATA_FILE
return list({f.parent for f in path.rglob(filename)})
[docs]
def find_task_group_dirs(path: Path | None = None) -> list[Path]:
"""Find all task group directories in the directory tree below ``path``.
Args:
path: Top level directory to be searched. Defaults to current working
directory.
"""
path = path or Path().cwd()
filename = SETTINGS.TASK_GROUP_METADATA_FILE
return list({f.parent for f in path.rglob(filename)})
[docs]
def find_task_dirs(path: Path | None = None) -> list[Path]:
"""Find all task directories in the directory tree below ``path``.
Args:
path: Top level directory to be searched. Defaults to current working
directory.
"""
path = path or Path().cwd()
filename = SETTINGS.TASK_METADATA_FILE
return list({f.parent for f in path.rglob(filename)})
[docs]
def find_last_submitted_jobs(
path: Path | None = None,
ignore_unrun_jobs: bool = False,
) -> list[Path]:
"""Returns the directories of the most recently submitted jobs.
Only the directories in each calculation specified in "path" or
subdirectories of "path" are returned.
Args:
path: The directory specifying or containing calculations. Defaults
to current working directory.
ignore_unrun_jobs: If true, no job will be reported for calculation
directories containing jobs that have yet been run. Otherwise, the
most recently submitted job will be reported. Defaults to False.
Returns:
A list of Paths to directories containing newest jobs for each
calculation in path or subdirectories of path.
"""
calc_dirs = find_task_group_dirs(path)
newest_jobs: list[Path] = []
for calc_dir in calc_dirs:
newest_job_dir = None
newest_job_id = None
for job_dir in calc_dir.iterdir():
if not job_dir.is_dir():
continue
try:
job_id = get_slurm_job_id(job_dir)
except FileNotFoundError:
if ignore_unrun_jobs:
break
continue
if newest_job_id is None or job_id > newest_job_id:
newest_job_id = job_id
newest_job_dir = job_dir
if newest_job_dir is not None:
newest_jobs.append(newest_job_dir)
return newest_jobs
[docs]
def check_job_status(job_id: int) -> str:
"""Determine the status of a SLURM job.
Args:
job_id: The Slurm job ID.
Returns:
A string indicating the job status.
"""
output = subprocess.check_output(
["/usr/bin/env", "seff", str(job_id)],
encoding="utf-8",
)
status_re = re.compile(r"^State: (?P<status>\w+) \(exit code \d*\)$")
for line in output.splitlines():
if match := status_re.match(line):
return match.group("status")
msg = (
f"Unable to determine the status of job: {job_id}. Please verify "
"that this is a valid SLURM job ID"
)
raise ValueError(msg)
[docs]
def find_finished_jobs(path: Path | None = None) -> list[Path]:
"""Find the directories and subdirectories containing finished jobs.
These jobs may have terminated due to errors, but they are no longer
running.
Args:
path: The directory in which to search. Defaults to None (in which
case the current working directory is searched).
Returns:
A list of Paths pointing to directories containing jobs that have
finished.
"""
last_submitted = find_last_submitted_jobs(
path=path, ignore_unrun_jobs=True
)
finished_jobs = []
for job_dir in last_submitted:
job_id = get_slurm_job_id(job_dir=job_dir)
status = check_job_status(job_id=job_id)
if status.lower() != "idle":
finished_jobs.append(job_dir)
return finished_jobs
def _find_template_dir(
pattern: re.Pattern[str], path: Path | None = None
) -> list[Path]:
"""Returns list of directories.
Note that if the supplied path matches the specified pattern, its
subdirectories are not searched.
Args:
path: The starting directory for the search.
pattern: A regular expression to match with directory names.
Returns:
The list of directories matching pattern.
"""
if path is None:
path = Path.cwd()
if pattern.fullmatch(path.name):
return [path]
dirs: list[Path] = []
for sub_path in path.iterdir():
if not sub_path.is_dir():
continue
if pattern.fullmatch(sub_path.name):
dirs.append(sub_path)
else:
dirs.extend(_find_template_dir(pattern, sub_path))
return dirs
[docs]
def get_loader() -> jinja2.BaseLoader:
"""Return the Jinja template loader."""
if SETTINGS.TEMPLATE_DIR:
loader = jinja2.FileSystemLoader(SETTINGS.TEMPLATE_DIR)
else:
loader = jinja2.PackageLoader(__name__.split(".", maxsplit=1)[0])
return loader
[docs]
def copy_permissions_and_ownership(src: Path, dest: Path) -> None:
"""Copy file permissions and ownership bits.
Args:
src: The file/directory whose permissions and ownership are to be
copied.
dest: The file/directory whose permissions and ownership are to be
changed.
"""
mode, usr, grp = src.stat().st_mode, src.stat().st_uid, src.stat().st_gid
dest.chmod(mode=mode)
shutil.chown(path=dest, user=usr, group=grp)
[docs]
def get_last_updated(dir_name: Path) -> datetime:
"""Get the date and time of the last updated file in a directory.
Args:
dir_name: The directory in which to search.
Returns:
A :class:`~datetime` object representing the last time a file
in the directory was updated. If `dir_name` is empty, then the last
time that `dir_name` was updated is returned.
"""
files = list(dir_name.iterdir()) or [dir_name]
max_time = max(file.stat().st_mtime for file in files)
# This is the date of the epoch start in Python
epoch_start = datetime(1970, 1, 1, tzinfo=UTC)
return epoch_start + timedelta(seconds=max_time)
[docs]
def template_script(
dest: Path,
*,
scheduler_script_template: str = SETTINGS.TASK_SCRIPT_TEMPLATE,
context: dict[str, Any],
) -> Path:
"""Write the input script using the template given.
Args:
dest: The filename to write.
scheduler_script_template: The template file to use. Defaults to
``SETTINGS.SCHEDULER_SCRIPT_TEMPLATE``.
context: A dictionary containing context variables which will be passed
to to :meth:`jinja2.environment.Template.render`.
"""
env = jinja2.Environment(
loader=get_loader(),
autoescape=True,
trim_blocks=True,
lstrip_blocks=True,
keep_trailing_newline=True,
)
template = env.get_template(scheduler_script_template)
with dest.open(mode="x", encoding="utf-8") as file:
file.write(template.render(**context))