Source code for autojob.utils.parsing

"""Utilities for parsing data."""

from datetime import datetime
from datetime import timedelta
import json
import logging
from pathlib import Path
import re
from typing import Any
from typing import Literal
from typing import NamedTuple

from autojob import SETTINGS

logger = logging.getLogger(__name__)


[docs] class TimedeltaTuple(NamedTuple): """Convenience wrapper around a timedelta object.""" days: int = 0 hours: int = 0 minutes: int = 0 seconds: int = 0 def __str__(self) -> str: """Return the result of `.to_slurm_time()`.""" return self.to_slurm_time()
[docs] @classmethod def from_timedelta(cls, delta: timedelta) -> "TimedeltaTuple": """Break a timedelta instance into days, hours, minutes, and seconds. Args: delta: a timedelta instance. Returns: A 4-tuple of ints: days, hours, minutes, seconds """ days = delta.days hours = delta.seconds // 3600 minutes = (delta.seconds - (hours * 3600)) // 60 seconds = delta.seconds - (hours * 3600) - (minutes * 60) return cls(days, hours, minutes, seconds)
[docs] @classmethod def from_string( cls, string: str, time_format: Literal["iso", "slurm"] = "slurm", ) -> "TimedeltaTuple": """Return a TimedeltaTuple from a string. Args: string: the time string to parse. time_format: One of "iso" or "slurm". Determines how the time string is parsed. Returns: A TimedeltaTuple. """ match time_format: case "slurm": return TimedeltaTuple.from_slurm_time(string) case "iso": dt = datetime.fromisoformat(string) midnight = datetime( year=dt.year, month=dt.month, day=dt.day, hour=0, minute=0, second=0, microsecond=0, tzinfo=dt.tzinfo, ) return cls.from_timedelta(dt - midnight) case _: raise NotImplementedError
[docs] @classmethod def from_slurm_time(cls, time: str) -> "TimedeltaTuple": """Parses a valid slurm time value into a TimedeltaTuple. The six formats accepted by Slurm are: 1: minutes 2: minutes:seconds 3: hours:minutes:seconds 4: days-hours 5: days-hours:minutes 6: days-hours:minutes:seconds Args: time: the string containing the value of the --time slurm option Raises: ValueError: The string is not a valid value of the slurm --time option. See https://slurm.schedmd.com/sbatch.html for details. Returns: A TimedeltaTuple. """ logger.debug(f"Parsing slurm time from {time=!r}") pattern1 = ( r"^(?:(?=\d+:\d+:\d+$)(?P<hours>\d+):)?(?P<minutes>\d+)" r"(:(?P<seconds>\d+))?$" ) match1 = re.match(pattern1, time.strip()) pattern2 = ( r"^(?P<days>\d+)-(?P<hours>\d+)(:(?P<minutes>\d+)(:" r"(?P<seconds>\d+))?)?$" ) match2 = re.match(pattern2, time.strip()) if match1: match = match1 elif match2: match = match2 else: msg = f"{time} is not a valid value of the slurm --time option" raise ValueError(msg) parsed_time_denominations = { k: int(v) for k, v in match.groupdict().items() if k and v is not None } logger.debug( f"Successfully parsed slurm time from {time=!r}. " f"Values: {parsed_time_denominations=!r}" ) return cls(**parsed_time_denominations)
[docs] def to_timedelta(self) -> timedelta: """Convert a `TimedeltaTuple` to a `timedelta` instance.""" return timedelta( days=self.days, hours=self.hours, minutes=self.minutes, seconds=self.seconds, )
[docs] @staticmethod def format_time(time_denomination: int) -> str: """Format time into a 0-padded integer.""" if 0 <= time_denomination < 10: # noqa: PLR2004 return f"0{time_denomination}" return str(time_denomination)
# TODO: Use format spec
[docs] def to_slurm_time(self) -> str: """Convert `TimedeltaTuple` into a SLURM-compatible time format.""" days = f"{self.days}-" if self.days else "" hours = TimedeltaTuple.format_time(self.hours) minutes = TimedeltaTuple.format_time(self.minutes) seconds = TimedeltaTuple.format_time(self.seconds) return f"{days}{hours}:{minutes}:{seconds}"
[docs] def parse_job_stats_file( dir_name: Path, ) -> dict[str, float | int | str]: """Parse a job stats file. Args: dir_name: Path to the job stats file. Returns: A dictionary mapping job stats to their values. """ logger.debug(f"Parsing job stats from directory: {dir_name}") stats_file = dir_name.joinpath(SETTINGS.SCHEDULER_STATS_FILE) if stats_file.exists(): with stats_file.open(encoding="utf-8") as file: raw_data: dict[str, Any] = json.load(file)["jobs"][0] data_stats = raw_data["steps"][0]["tres"]["consumed"]["total"] time_stats = raw_data["time"] job_stats = { "partition": raw_data["partition"], "elapsed": timedelta(seconds=raw_data["time"]["elapsed"]), "idle_time": timedelta( seconds=time_stats["start"] - time_stats["submission"] ), "job_id": raw_data["job_id"], "max_rss": int(data_stats[1]["count"] / 1e6), # ! Unclear what determines the order of states "state": raw_data["state"]["current"], "nodes": raw_data["steps"][0]["nodes"]["list"], } logger.debug(f"Successfully parsed job stats from file: {stats_file}") return job_stats msg = f"No job stats file found in directory: {dir_name}" raise FileNotFoundError(msg)