Source code for autojob.utils.parsing

"""Utilities for parsing data."""

from datetime import datetime
from datetime import timedelta
import json
import logging
from pathlib import Path
import re
from typing import Any
from typing import Literal
from typing import NamedTuple
from typing import TextIO

from autojob import SETTINGS

logger = logging.getLogger(__name__)


[docs] class TimedeltaTuple(NamedTuple): """Convenience wrapper around a timedelta object.""" days: int = 0 hours: int = 0 minutes: int = 0 seconds: int = 0 def __str__(self) -> str: """Return the result of `.to_slurm_time()`.""" return self.to_slurm_time()
[docs] @classmethod def from_timedelta(cls, delta: timedelta) -> "TimedeltaTuple": """Break a timedelta instance into days, hours, minutes, and seconds. Args: delta: a timedelta instance. Returns: A 4-tuple of ints: days, hours, minutes, seconds """ days = delta.days hours = delta.seconds // 3600 minutes = (delta.seconds - (hours * 3600)) // 60 seconds = delta.seconds - (hours * 3600) - (minutes * 60) return cls(days, hours, minutes, seconds)
[docs] @classmethod def from_string( cls, string: str, time_format: Literal["iso", "slurm"] = "slurm", ) -> "TimedeltaTuple": """Return a TimedeltaTuple from a string. Args: string: the time string to parse. time_format: One of "iso" or "slurm". Determines how the time string is parsed. Returns: A TimedeltaTuple. """ match time_format: case "slurm": return TimedeltaTuple.from_slurm_time(string) case "iso": dt = datetime.fromisoformat(string) midnight = datetime( year=dt.year, month=dt.month, day=dt.day, hour=0, minute=0, second=0, microsecond=0, tzinfo=dt.tzinfo, ) return cls.from_timedelta(dt - midnight) case _: raise NotImplementedError
[docs] @classmethod def from_slurm_time(cls, time: str) -> "TimedeltaTuple": """Parses a valid slurm time value into a TimedeltaTuple. The six formats accepted by Slurm are: 1: minutes 2: minutes:seconds 3: hours:minutes:seconds 4: days-hours 5: days-hours:minutes 6: days-hours:minutes:seconds Args: time: the string containing the value of the --time slurm option Raises: ValueError: The string is not a valid value of the slurm --time option. See https://slurm.schedmd.com/sbatch.html for details. Returns: A TimedeltaTuple. """ logger.debug(f"Parsing slurm time from {time=!r}") pattern1 = ( r"^(?:(?=\d+:\d+:\d+$)(?P<hours>\d+):)?(?P<minutes>\d+)" r"(:(?P<seconds>\d+))?$" ) match1 = re.match(pattern1, time.strip()) pattern2 = ( r"^(?P<days>\d+)-(?P<hours>\d+)(:(?P<minutes>\d+)(:" r"(?P<seconds>\d+))?)?$" ) match2 = re.match(pattern2, time.strip()) if match1: match = match1 elif match2: match = match2 else: msg = f"{time} is not a valid value of the slurm --time option" raise ValueError(msg) parsed_time_denominations = { k: int(v) for k, v in match.groupdict().items() if k and v is not None } logger.debug( f"Successfully parsed slurm time from {time=!r}. " f"Values: {parsed_time_denominations=!r}" ) return cls(**parsed_time_denominations)
[docs] def to_timedelta(self) -> timedelta: """Convert a `TimedeltaTuple` to a `timedelta` instance.""" return timedelta( days=self.days, hours=self.hours, minutes=self.minutes, seconds=self.seconds, )
[docs] @staticmethod def format_time(time_denomination: int) -> str: """Format time into a 0-padded integer.""" if 0 <= time_denomination < 10: # noqa: PLR2004 return f"0{time_denomination}" return str(time_denomination)
# TODO: Use format spec
[docs] def to_slurm_time(self) -> str: """Convert `TimedeltaTuple` into a SLURM-compatible time format.""" days = f"{self.days}-" if self.days else "" hours = TimedeltaTuple.format_time(self.hours) minutes = TimedeltaTuple.format_time(self.minutes) seconds = TimedeltaTuple.format_time(self.seconds) return f"{days}{hours}:{minutes}:{seconds}"
[docs] def parse_job_stats_file( dir_name: Path, ) -> dict[str, float | int | str]: """Parse a job stats file. Args: dir_name: Path to the job stats file. Returns: A dictionary mapping job stats to their values. """ logger.debug(f"Parsing job stats from directory: {dir_name}") stats_file = dir_name.joinpath(SETTINGS.SCHEDULER_STATS_FILE) if stats_file.exists(): with stats_file.open(encoding="utf-8") as file: raw_data: dict[str, Any] = json.load(file)["jobs"][0] val, units = raw_data["max_rss"].split() max_rss = convert(float(val), units, "KB") time_stats = raw_data["time"] state = raw_data["state"]["current"] state = state[0] if isinstance(state, list) else state job_stats = { "partition": raw_data["partition"], "elapsed": timedelta(seconds=raw_data["time"]["elapsed"]), "idle_time": timedelta( seconds=time_stats["start"] - time_stats["submission"] ), "job_id": raw_data["job_id"], "max_rss": int(max_rss), # ! Unclear what determines the order of states "state": state, "nodes": raw_data["steps"][0]["nodes"]["list"], } logger.debug(f"Successfully parsed job stats from file: {stats_file}") return job_stats msg = f"No job stats file found in directory: {dir_name}" raise FileNotFoundError(msg)
[docs] def parse_time_from_slurm_script(script: TextIO) -> timedelta: """Parse the time limit from a SLURM submission script.""" time_re = re.compile(r"^#SBATCH\s*-((-time)|(t))=(?P<time>.*)$") for line in script: match = time_re.match(line) if match: slurm_time = match.group("time") if slurm_time: return TimedeltaTuple.from_slurm_time( slurm_time ).to_timedelta() msg = "Unable to parse time from slurm script" raise ValueError(msg)
def _determine_conversion_factor(units: str) -> float: prefixes = ["", "k", "m", "g", "t"] units = units.lower().rstrip("b") try: exponent = prefixes.index(units) return 1e3**exponent except ValueError as err: msg = f"Unknown units specified: {units}" raise ValueError(msg) from err
[docs] def convert(memory: float, from_units: str, to_units: str) -> float: """Convert memory denominations in binary. Units can be specified using uppercase, lowercase, one-, or two-letter abbreviations. E.g., 'K', 'k', 'KB', 'kb' are all interpreted as kilobytes. Args: memory (float): memory to be converted. from_units (str): The units from which memory is to be converted. to_units (str): The units to which memory is to be converted. Returns: float: The memory in the desired units. """ factor = _determine_conversion_factor(units=from_units) divisor = _determine_conversion_factor(units=to_units) return (factor / divisor) * memory