"""Utilities for parsing data."""
import ast
from collections.abc import Iterable
from datetime import datetime
from datetime import timedelta
import importlib
import logging
import pathlib
import re
from typing import Any
from typing import Literal
from typing import NamedTuple
from typing import TypeVar
from pydantic import ImportString
from autojob.coordinator import job
logger = logging.getLogger(__name__)
[docs]
class TimedeltaTuple(NamedTuple):
"""Convenience wrapper around a timedelta object."""
days: int = 0
hours: int = 0
minutes: int = 0
seconds: int = 0
def __str__(self) -> str:
"""Return the result of `.to_slurm_time()`."""
return self.to_slurm_time()
[docs]
@classmethod
def from_timedelta(cls, delta: timedelta) -> "TimedeltaTuple":
"""Break a timedelta instance into days, hours, minutes, and seconds.
Args:
delta: a timedelta instance.
Returns:
A 4-tuple of ints: days, hours, minutes, seconds
"""
days = delta.days
hours = delta.seconds // 3600
minutes = (delta.seconds - (hours * 3600)) // 60
seconds = delta.seconds - (hours * 3600) - (minutes * 60)
return cls(days, hours, minutes, seconds)
[docs]
@classmethod
def from_string(
cls,
string: str,
time_format: Literal["iso", "slurm"] = "slurm",
) -> "TimedeltaTuple":
"""Return a TimedeltaTuple from a string.
Args:
string: the time string to parse.
time_format: One of "iso" or "slurm". Determines how the time
string is parsed.
Returns:
A TimedeltaTuple.
"""
match time_format:
case "slurm":
return TimedeltaTuple.from_slurm_time(string)
case "iso":
dt = datetime.fromisoformat(string)
midnight = datetime(
year=dt.year,
month=dt.month,
day=dt.day,
hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=dt.tzinfo,
)
return cls.from_timedelta(dt - midnight)
case _:
raise NotImplementedError
[docs]
@classmethod
def from_slurm_time(cls, time: str) -> "TimedeltaTuple":
"""Parses a valid slurm time value into a TimedeltaTuple.
The six formats accepted by Slurm are:
1: minutes
2: minutes:seconds
3: hours:minutes:seconds
4: days-hours
5: days-hours:minutes
6: days-hours:minutes:seconds
Args:
time: the string containing the value of the --time slurm option
Raises:
ValueError: The string is not a valid value of the slurm --time
option. See https://slurm.schedmd.com/sbatch.html for details.
Returns:
A TimedeltaTuple.
"""
logger.debug(f"Parsing slurm time from {time=!r}")
pattern1 = (
r"^(?:(?=\d+:\d+:\d+$)(?P<hours>\d+):)?(?P<minutes>\d+)"
r"(:(?P<seconds>\d+))?$"
)
match1 = re.match(pattern1, time.strip())
pattern2 = (
r"^(?P<days>\d+)-(?P<hours>\d+)(:(?P<minutes>\d+)(:"
r"(?P<seconds>\d+))?)?$"
)
match2 = re.match(pattern2, time.strip())
if match1:
match = match1
elif match2:
match = match2
else:
msg = f"{time} is not a valid value of the slurm --time option"
raise ValueError(msg)
parsed_time_denominations = {
k: int(v)
for k, v in match.groupdict().items()
if k and v is not None
}
logger.debug(
f"Successfully parsed slurm time from {time=!r}. "
f"Values: {parsed_time_denominations=!r}"
)
return cls(**parsed_time_denominations)
[docs]
def to_timedelta(self) -> timedelta:
"""Convert a `TimedeltaTuple` to a `timedelta` instance."""
return timedelta(
days=self.days,
hours=self.hours,
minutes=self.minutes,
seconds=self.seconds,
)
# TODO: Use format spec
[docs]
def to_slurm_time(self) -> str:
"""Convert `TimedeltaTuple` into a SLURM-compatible time format."""
days = f"{self.days}-" if self.days else ""
hours = TimedeltaTuple.format_time(self.hours)
minutes = TimedeltaTuple.format_time(self.minutes)
seconds = TimedeltaTuple.format_time(self.seconds)
return f"{days}{hours}:{minutes}:{seconds}"
_T = TypeVar("_T")
[docs]
def vectorize_underscored_data(rows: list[str]) -> tuple[list[str], list[str]]:
"""Turns rows of underscored data into columns.
An example of supported data is that which is returned by the SLURM command
`sacct`::
Partition MaxRSS NNodes Start
--------- ---------- -------- -------------------
razi 1 2022-07-29T09:48:15
18049744K 1 2022-07-29T09:48:15
0 1 2022-07-29T09:48:15
Args:
rows: A list of strings read from a file containing the output from
a Slurm job stats file or sacct.
Returns:
Vectorized job stats are returned as a tuple (headers, columns) where
headers is a list of strings representing the headers used in the job
stats file and columns is a list of lists of strings representing the
remaining entries in the column. The header delimiters are excluded.
"""
logger.debug(f"Vectorizing data:\n\n{rows}\n")
delimiters = rows[1]
headers: list[str] = []
column_widths = [len(x) for x in delimiters.split()]
columns: list[list[str]] = []
boundary = 0
for width in column_widths:
headers.append(rows[0][boundary : boundary + width])
columns.append([row[boundary : boundary + width] for row in rows[2:]])
boundary += width + 1
logger.debug(f"Successfully vectorized data:\n{headers!r}\n{columns!r}\n")
return headers, columns
[docs]
def reduce_sparse_vector(vector: Iterable[_T]) -> _T:
"""Returns the first value in the sparse vector.
Args:
vector: An iterable.
Raises:
ValueError: The vector is empty.
"""
try:
return next(x for x in vector if x.replace(" ", ""))
except StopIteration as err:
msg = "No values in sparse vector"
raise ValueError(msg) from err
[docs]
def parse_job_stats_file(
stats_file: pathlib.Path,
) -> dict[str, float | int | str]:
"""Parse information from a job stats file into a dictionary.
Args:
stats_file: Path to jobstats.txt file.
Raises:
ValueError: Missing headers in job stats file or extra headers found.
Returns:
The parsed job stats dictionary.
Note that no validation/conversion is done to the field values.
Conversion to valid (more useful) Python values can be performed
using `SchedulerOutputs.model_validate`.
"""
logger.debug(f"Parsing job stats from: {stats_file}")
with stats_file.open(encoding="utf-8") as file:
rows = file.readlines()
headers, columns = vectorize_underscored_data(rows=rows)
headers = [h.replace(" ", "") for h in headers]
values = []
for i, column in enumerate(columns):
try:
value = reduce_sparse_vector(vector=column).replace(" ", "")
except ValueError:
logger.info(f"No value found for {headers[i]}")
value = None
values.append(value)
# Create job stats dictionary
try:
job_stats = dict(zip(headers, values, strict=True))
except ValueError as error:
msg = "Unable to parse job stats file."
raise ValueError(msg) from error
missing_headers = [
header for header in job.JOB_STATS_FIELDS if header not in job_stats
]
if missing_headers:
missing = ", ".join(missing_headers)
msg = f"Missing headers in job stats file: {missing}."
raise ValueError(msg)
logger.debug(f"Successfully parsed job stats from: {stats_file}")
return job_stats
[docs]
def parse_job_error(slurm_file: pathlib.Path) -> job.JobError | None:
"""Parse the reason for job termination from the slurm script.
Args:
slurm_file: A Path pointing to the slurm script.
Returns:
A `JobError` corresponding to the reason for job termination,
otherwise None.
"""
logger.info(f"Parsing job error in {slurm_file}")
error_checker = re.compile(r"Cancelled due to (time|memory) limit")
with slurm_file.open(encoding="utf-8") as file:
for line in file:
match = error_checker.search(line)
if match:
reason = match.group(1)
error = job.JobError(f"{reason} limit")
logger.info(f"Job error found: {error}")
return error
logger.info("No job error found")
return None
[docs]
def import_class(class_string: ImportString[_T]) -> _T:
"""Import a class using its fully qualified name.
Args:
class_string: The fully qualified name of the class. For example,
autojob.hpc.SchedulerInputs.
Returns:
The class.
"""
if isinstance(class_string, str):
parts = class_string.split(".")
name = ".".join(parts[:-1])
class_name = parts[-1]
mod = importlib.import_module(name)
return getattr(mod, class_name)
raise TypeError