Source code for autojob.cli.run

"""This module defines the autojob run CLI."""

from datetime import UTC
from datetime import datetime
from datetime import timedelta
import logging
from pathlib import Path
import re
import subprocess
import sys
from threading import Thread
import time
from typing import NoReturn
from typing import TextIO

import click

from autojob import SETTINGS
from autojob.utils.parsing import TimedeltaTuple

logger = logging.getLogger(__name__)


def _parse_time_from_slurm_script(script: TextIO) -> timedelta:
    time_re = re.compile(r"^#SBATCH\s*-(-time)|(t)=(?P<time>.*)$")
    for line in script:
        match = time_re.match(line)
        if match:
            time = match.group("time")
            if time:
                return TimedeltaTuple.from_slurm_time(time).to_timedelta()
    msg = "Unable to parse time from slurm script"
    raise ValueError(msg)


[docs] def run_script(script: Path, time_limit: float, frequency: float) -> NoReturn: """Run a script with a time limit. Args: script: The script to run. time_limit: The time limit in seconds. frequency: How frequently to check if the underlying function has completed. """ def execute(): _ = subprocess.run([sys.executable, str(script)], check=False) logger.info("Executing script: %s", script) thread = Thread(target=execute) thread.daemon = True start = datetime.now(tz=UTC) thread.start() logger.debug("Will abort after %.2f hours.", time_limit / 3600) logger.debug("Checking every %.2f hours.", frequency / 3600) elapsed = (datetime.now(tz=UTC) - start).total_seconds() while elapsed < time_limit: time.sleep(frequency) elapsed = (datetime.now(tz=UTC) - start).total_seconds() logging.debug("Checking... Elapsed time: %.2f hr", elapsed / 3600) if not thread.is_alive(): logging.info(" Job appears finished. Exiting.") sys.exit(0) logger.info("Time limit reached. Elapsed time: %.2f hr", elapsed / 3600) sys.exit(124)
@click.command( "init", context_settings={"help_option_names": ["-h", "--help"]} ) @click.option( "-t", "--time-source", default=SETTINGS.DEFAULT_TASK_SCRIPT_FILE, help="The file from which to parse the time limit", show_default=True, type=click.Path(exists=True, dir_okay=False, path_type=Path), ) @click.option( "-b", "--buffer", default=0.05, help="Specify the time buffer for the task. A value greater than or equal " "to 0 and less than 1 will be interpreted as the fraction of the total " "time to use as a buffer. Values greater than or equal to 1 will be " "interpreted as seconds.", show_default=True, type=float, ) @click.option( "-f", "--frequency", default=30, help="How frequently the process is checked for completion in seconds.", show_default=True, type=float, ) @click.argument( "script", default=SETTINGS.DEFAULT_CALCULATION_SCRIPT_FILE, type=click.Path(exists=True, dir_okay=False, path_type=Path), ) def main( # noqa: D417 time_source: Path, script: Path, buffer: float, frequency: float ) -> None: """Run a Python script under a time limit. Args: script: The Python script to run. """ with time_source.open(mode="r", encoding="utf-8") as file: time = _parse_time_from_slurm_script(file) if 0 <= buffer < 1: time_limit = time.total_seconds() * buffer elif buffer >= 1: time_limit = time.total_seconds() - buffer else: msg = f"Invalid buffer value: {buffer}" raise ValueError(msg) run_script(script, time_limit, frequency)