Source code for autojob.utils.files

"""Utilities for handling files and directories."""

import contextlib
from datetime import UTC
from datetime import datetime
from datetime import timedelta
import json
import logging
from pathlib import Path
import re
import shutil
import socket
import subprocess
from typing import Any

import jinja2

from autojob import SETTINGS

logger = logging.getLogger(__name__)


[docs] def get_uri(dir_name: str | Path) -> str: """Return the URI path for a directory. This allows files hosted on different file servers to have distinct locations. Adapted from Atomate2. Arg: dir_name: A directory name. Returns: Full URI path, e.g., "fileserver.host.com:/full/path/of/dir_name". """ fullpath = Path(dir_name).absolute() hostname = socket.gethostname() with contextlib.suppress(socket.gaierror, socket.herror): hostname = socket.gethostbyaddr(hostname)[0] return f"{hostname}:{fullpath}"
[docs] def get_slurm_job_id(job_dir: Path) -> int: """Returns the SLURM job id for the job run in the directory "job_dir". Args: job_dir: The directory containing the slurm output file. Raises: FileNotFoundError: SLURM output file not found. Returns: The SLURM job id. """ slurm_re = re.compile(r"slurm-(\d+).out") for path in sorted(job_dir.iterdir(), reverse=True): match = slurm_re.fullmatch(path.name) if match: return int(match[1]) msg = f"No slurm output file found in {'/'.join(job_dir.parts[-4:])}" raise FileNotFoundError(msg)
def _add_memory_to_stats(slurm_job_id: int, job_stats_file: Path) -> None: logger.debug(f"Add memory to job stats file for Slurm job: {slurm_job_id}") slurm_cmd = [ "/usr/bin/env", "seff", str(slurm_job_id), ] try: output = subprocess.check_output(slurm_cmd, text=True) memory_re = re.compile(r"Memory Utilized: (?P<memory>\d+\.\d{2} GB)") match = memory_re.search(output) memory = match.group("memory") with job_stats_file.open(mode="r", encoding="utf-8") as file: job_stats = json.load(file) job_stats["jobs"][0]["max_rss"] = memory with job_stats_file.open(mode="w", encoding="utf-8") as file: json.dump(job_stats, file, indent=4) except (AttributeError, subprocess.CalledProcessError) as err: msg = f"Unable to add memory to job stats file for Slurm job: {slurm_job_id}" raise RuntimeError(msg) from err logger.debug( f"Successfully added memory to job stats file for Slurm job: {slurm_job_id}" )
[docs] def create_job_stats_file(slurm_job_id: int, job_dir: str | Path) -> Path: """Creates file containing statistics from completed Slurm job. Args: slurm_job_id: The Slurm job ID for the job. job_dir: The job directory. Raises: RuntimeError: Unable to create job stats file. Returns: A Path to the file containing the job statistics. """ logger.debug(f"Creating job stats file for Slurm job: {slurm_job_id}") job_stats_file = Path(job_dir).joinpath(SETTINGS.SCHEDULER_STATS_FILE) slurm_cmd = [ "/usr/bin/env", "sacct", "--json", f"--jobs={slurm_job_id}", ] try: output = subprocess.check_output(slurm_cmd, text=True) with job_stats_file.open(mode="w", encoding="utf-8") as file: _ = file.write(output) except subprocess.CalledProcessError as err: msg = ( f"Unable to create job stats file for job in directory: {job_dir}" ) raise RuntimeError(msg) from err _add_memory_to_stats(slurm_job_id, job_stats_file) logger.debug( f"Successfully created job stats file for Slurm job: {slurm_job_id}" ) return job_stats_file
[docs] def find_study_group_dirs(path: Path | None = None) -> list[Path]: """Find all study group directories in the directory tree below ``path``. Args: path: Top level directory to be searched. Defaults to current working directory. """ path = path or Path().cwd() filename = SETTINGS.STUDY_GROUP_METADATA_FILE return list({f.parent for f in path.rglob(filename)})
[docs] def find_study_dirs(path: Path | None = None) -> list[Path]: """Find all study directories in the directory tree below ``path``. Args: path: Top level directory to be searched. Defaults to current working directory. """ path = path or Path().cwd() filename = SETTINGS.STUDY_METADATA_FILE return list({f.parent for f in path.rglob(filename)})
[docs] def find_task_group_dirs(path: Path | None = None) -> list[Path]: """Find all task group directories in the directory tree below ``path``. Args: path: Top level directory to be searched. Defaults to current working directory. """ path = path or Path().cwd() filename = SETTINGS.TASK_GROUP_METADATA_FILE return list({f.parent for f in path.rglob(filename)})
[docs] def find_task_dirs(path: Path | None = None) -> list[Path]: """Find all task directories in the directory tree below ``path``. Args: path: Top level directory to be searched. Defaults to current working directory. """ path = path or Path().cwd() filename = SETTINGS.TASK_METADATA_FILE return list({f.parent for f in path.rglob(filename)})
[docs] def find_last_submitted_jobs( path: Path | None = None, ignore_unrun_jobs: bool = False, ) -> list[Path]: """Returns the directories of the most recently submitted jobs. Only the directories in each calculation specified in "path" or subdirectories of "path" are returned. Args: path: The directory specifying or containing calculations. Defaults to current working directory. ignore_unrun_jobs: If true, no job will be reported for calculation directories containing jobs that have yet been run. Otherwise, the most recently submitted job will be reported. Defaults to False. Returns: A list of Paths to directories containing newest jobs for each calculation in path or subdirectories of path. """ calc_dirs = find_task_group_dirs(path) newest_jobs: list[Path] = [] for calc_dir in calc_dirs: newest_job_dir = None newest_job_id = None for job_dir in calc_dir.iterdir(): if not job_dir.is_dir(): continue try: job_id = get_slurm_job_id(job_dir) except FileNotFoundError: if ignore_unrun_jobs: break continue if newest_job_id is None or job_id > newest_job_id: newest_job_id = job_id newest_job_dir = job_dir if newest_job_dir is not None: newest_jobs.append(newest_job_dir) return newest_jobs
[docs] def check_job_status(job_id: int) -> str: """Determine the status of a SLURM job. Args: job_id: The Slurm job ID. Returns: A string indicating the job status. """ output = subprocess.check_output( ["/usr/bin/env", "seff", str(job_id)], encoding="utf-8", ) status_re = re.compile(r"^State: (?P<status>\w+) \(exit code \d*\)$") for line in output.splitlines(): if match := status_re.match(line): return match.group("status") msg = ( f"Unable to determine the status of job: {job_id}. Please verify " "that this is a valid SLURM job ID" ) raise ValueError(msg)
[docs] def find_finished_jobs(path: Path | None = None) -> list[Path]: """Find the directories and subdirectories containing finished jobs. These jobs may have terminated due to errors, but they are no longer running. Args: path: The directory in which to search. Defaults to None (in which case the current working directory is searched). Returns: A list of Paths pointing to directories containing jobs that have finished. """ last_submitted = find_last_submitted_jobs( path=path, ignore_unrun_jobs=True ) finished_jobs = [] for job_dir in last_submitted: job_id = get_slurm_job_id(job_dir=job_dir) status = check_job_status(job_id=job_id) if status.lower() != "idle": finished_jobs.append(job_dir) return finished_jobs
def _find_template_dir( pattern: re.Pattern[str], path: Path | None = None ) -> list[Path]: """Returns list of directories. Note that if the supplied path matches the specified pattern, its subdirectories are not searched. Args: path: The starting directory for the search. pattern: A regular expression to match with directory names. Returns: The list of directories matching pattern. """ if path is None: path = Path.cwd() if pattern.fullmatch(path.name): return [path] dirs: list[Path] = [] for sub_path in path.iterdir(): if not sub_path.is_dir(): continue if pattern.fullmatch(sub_path.name): dirs.append(sub_path) else: dirs.extend(_find_template_dir(pattern, sub_path)) return dirs
[docs] def get_loader() -> jinja2.BaseLoader: """Return the Jinja template loader.""" if SETTINGS.TEMPLATE_DIR: loader = jinja2.FileSystemLoader(SETTINGS.TEMPLATE_DIR) else: loader = jinja2.PackageLoader(__name__.split(".", maxsplit=1)[0]) return loader
[docs] def copy_permissions_and_ownership(src: Path, dest: Path) -> None: """Copy file permissions and ownership bits. Args: src: The file/directory whose permissions and ownership are to be copied. dest: The file/directory whose permissions and ownership are to be changed. """ mode, usr, grp = src.stat().st_mode, src.stat().st_uid, src.stat().st_gid dest.chmod(mode=mode) shutil.chown(path=dest, user=usr, group=grp)
[docs] def get_last_updated(dir_name: Path) -> datetime: """Get the date and time of the last updated file in a directory. Args: dir_name: The directory in which to search. Returns: A :class:`~datetime` object representing the last time a file in the directory was updated. If `dir_name` is empty, then the last time that `dir_name` was updated is returned. """ files = list(dir_name.iterdir()) or [dir_name] max_time = max(file.stat().st_mtime for file in files) # This is the date of the epoch start in Python epoch_start = datetime(1970, 1, 1, tzinfo=UTC) return epoch_start + timedelta(seconds=max_time)
[docs] def template_script( dest: Path, *, scheduler_script_template: str = SETTINGS.TASK_SCRIPT_TEMPLATE, context: dict[str, Any], ) -> Path: """Write the input script using the template given. Args: dest: The filename to write. scheduler_script_template: The template file to use. Defaults to ``SETTINGS.SCHEDULER_SCRIPT_TEMPLATE``. context: A dictionary containing context variables which will be passed to to :meth:`jinja2.environment.Template.render`. """ env = jinja2.Environment( loader=get_loader(), autoescape=True, trim_blocks=True, lstrip_blocks=True, keep_trailing_newline=True, ) template = env.get_template(scheduler_script_template) with dest.open(mode="x", encoding="utf-8") as file: file.write(template.render(**context))