"""Utilities for parsing data."""
from datetime import datetime
from datetime import timedelta
import json
import logging
from pathlib import Path
import re
from typing import Any
from typing import Literal
from typing import NamedTuple
from typing import TextIO
from autojob import SETTINGS
logger = logging.getLogger(__name__)
[docs]
class TimedeltaTuple(NamedTuple):
"""Convenience wrapper around a timedelta object."""
days: int = 0
hours: int = 0
minutes: int = 0
seconds: int = 0
def __str__(self) -> str:
"""Return the result of `.to_slurm_time()`."""
return self.to_slurm_time()
[docs]
@classmethod
def from_timedelta(cls, delta: timedelta) -> "TimedeltaTuple":
"""Break a timedelta instance into days, hours, minutes, and seconds.
Args:
delta: a timedelta instance.
Returns:
A 4-tuple of ints: days, hours, minutes, seconds
"""
days = delta.days
hours = delta.seconds // 3600
minutes = (delta.seconds - (hours * 3600)) // 60
seconds = delta.seconds - (hours * 3600) - (minutes * 60)
return cls(days, hours, minutes, seconds)
[docs]
@classmethod
def from_string(
cls,
string: str,
time_format: Literal["iso", "slurm"] = "slurm",
) -> "TimedeltaTuple":
"""Return a TimedeltaTuple from a string.
Args:
string: the time string to parse.
time_format: One of "iso" or "slurm". Determines how the time
string is parsed.
Returns:
A TimedeltaTuple.
"""
match time_format:
case "slurm":
return TimedeltaTuple.from_slurm_time(string)
case "iso":
dt = datetime.fromisoformat(string)
midnight = datetime(
year=dt.year,
month=dt.month,
day=dt.day,
hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=dt.tzinfo,
)
return cls.from_timedelta(dt - midnight)
case _:
raise NotImplementedError
[docs]
@classmethod
def from_slurm_time(cls, time: str) -> "TimedeltaTuple":
"""Parses a valid slurm time value into a TimedeltaTuple.
The six formats accepted by Slurm are:
1: minutes
2: minutes:seconds
3: hours:minutes:seconds
4: days-hours
5: days-hours:minutes
6: days-hours:minutes:seconds
Args:
time: the string containing the value of the --time slurm option
Raises:
ValueError: The string is not a valid value of the slurm --time
option. See https://slurm.schedmd.com/sbatch.html for details.
Returns:
A TimedeltaTuple.
"""
logger.debug(f"Parsing slurm time from {time=!r}")
pattern1 = (
r"^(?:(?=\d+:\d+:\d+$)(?P<hours>\d+):)?(?P<minutes>\d+)"
r"(:(?P<seconds>\d+))?$"
)
match1 = re.match(pattern1, time.strip())
pattern2 = (
r"^(?P<days>\d+)-(?P<hours>\d+)(:(?P<minutes>\d+)(:"
r"(?P<seconds>\d+))?)?$"
)
match2 = re.match(pattern2, time.strip())
if match1:
match = match1
elif match2:
match = match2
else:
msg = f"{time} is not a valid value of the slurm --time option"
raise ValueError(msg)
parsed_time_denominations = {
k: int(v)
for k, v in match.groupdict().items()
if k and v is not None
}
logger.debug(
f"Successfully parsed slurm time from {time=!r}. "
f"Values: {parsed_time_denominations=!r}"
)
return cls(**parsed_time_denominations)
[docs]
def to_timedelta(self) -> timedelta:
"""Convert a `TimedeltaTuple` to a `timedelta` instance."""
return timedelta(
days=self.days,
hours=self.hours,
minutes=self.minutes,
seconds=self.seconds,
)
# TODO: Use format spec
[docs]
def to_slurm_time(self) -> str:
"""Convert `TimedeltaTuple` into a SLURM-compatible time format."""
days = f"{self.days}-" if self.days else ""
hours = TimedeltaTuple.format_time(self.hours)
minutes = TimedeltaTuple.format_time(self.minutes)
seconds = TimedeltaTuple.format_time(self.seconds)
return f"{days}{hours}:{minutes}:{seconds}"
[docs]
def parse_job_stats_file(
dir_name: Path,
) -> dict[str, float | int | str]:
"""Parse a job stats file.
Args:
dir_name: Path to the job stats file.
Returns:
A dictionary mapping job stats to their values.
"""
logger.debug(f"Parsing job stats from directory: {dir_name}")
stats_file = dir_name.joinpath(SETTINGS.SCHEDULER_STATS_FILE)
if stats_file.exists():
with stats_file.open(encoding="utf-8") as file:
raw_data: dict[str, Any] = json.load(file)["jobs"][0]
val, units = raw_data["max_rss"].split()
max_rss = convert(float(val), units, "KB")
time_stats = raw_data["time"]
state = raw_data["state"]["current"]
state = state[0] if isinstance(state, list) else state
job_stats = {
"partition": raw_data["partition"],
"elapsed": timedelta(seconds=raw_data["time"]["elapsed"]),
"idle_time": timedelta(
seconds=time_stats["start"] - time_stats["submission"]
),
"job_id": raw_data["job_id"],
"max_rss": int(max_rss),
# ! Unclear what determines the order of states
"state": state,
"nodes": raw_data["steps"][0]["nodes"]["list"],
}
logger.debug(f"Successfully parsed job stats from file: {stats_file}")
return job_stats
msg = f"No job stats file found in directory: {dir_name}"
raise FileNotFoundError(msg)
[docs]
def parse_time_from_slurm_script(script: TextIO) -> timedelta:
"""Parse the time limit from a SLURM submission script."""
time_re = re.compile(r"^#SBATCH\s*-((-time)|(t))=(?P<time>.*)$")
for line in script:
match = time_re.match(line)
if match:
slurm_time = match.group("time")
if slurm_time:
return TimedeltaTuple.from_slurm_time(
slurm_time
).to_timedelta()
msg = "Unable to parse time from slurm script"
raise ValueError(msg)
def _determine_conversion_factor(units: str) -> float:
prefixes = ["", "k", "m", "g", "t"]
units = units.lower().rstrip("b")
try:
exponent = prefixes.index(units)
return 1e3**exponent
except ValueError as err:
msg = f"Unknown units specified: {units}"
raise ValueError(msg) from err
[docs]
def convert(memory: float, from_units: str, to_units: str) -> float:
"""Convert memory denominations in binary.
Units can be specified using uppercase, lowercase, one-, or two-letter
abbreviations.
E.g., 'K', 'k', 'KB', 'kb' are all interpreted as kilobytes.
Args:
memory (float): memory to be converted.
from_units (str): The units from which memory is to be converted.
to_units (str): The units to which memory is to be converted.
Returns:
float: The memory in the desired units.
"""
factor = _determine_conversion_factor(units=from_units)
divisor = _determine_conversion_factor(units=to_units)
return (factor / divisor) * memory