Source code for autojob.harvest.archive

"""I/O functions for the utility scripts (compiled here for consistency)."""

from csv import DictWriter
from datetime import UTC
from datetime import datetime
import json
import logging
from pathlib import Path
from typing import Any
from typing import Literal

from autojob.bases.task_base import TaskBase
from autojob.tasks.calculation import Calculation

logger = logging.getLogger(__name__)


[docs] def flatten_calculations( calculations: list[Calculation], ) -> list[dict[str, Any]]: """Flatten each calculation into a CSV-friendly format. Args: calculations: The calculations to flatten. Returns: A list of dictionaries mapping calculation fields (e.g., ``energy``, ``forces``, ``zpe_correction``) to their values. The keys of nested dictionaries such as the calculation parameters are also accessible. """ flattened_calculations = [] for calculation in calculations: metadata = calculation.task_metadata.model_dump(mode="json") info = ( calculation.task_inputs.atoms.info if calculation.task_inputs.atoms else {} ) calc_inputs = calculation.calculation_inputs sched_inputs = calculation.scheduler_inputs calc_outputs_shell = {} if calc_outputs := calculation.calculation_outputs: calc_outputs_shell = calc_outputs.model_dump(mode="json") sched_outputs_shell = {} if sched_outputs := calculation.scheduler_outputs: sched_outputs_shell = sched_outputs.model_dump(mode="json") task_outcome = ( calculation.task_outputs.outcome if calculation.task_outputs else None ) flattened = { **metadata, **info, **calc_inputs.calc_params, **calc_inputs.opt_params, **calc_inputs.analyses, **calc_inputs.model_dump( mode="json", exclude={"calc_params", "opt_params"} ), **sched_inputs.model_dump(mode="json"), **calc_outputs_shell, **sched_outputs_shell, "outcome": task_outcome, } flattened_calculations.append(flattened) return flattened_calculations
[docs] def archive_json(tasks: list[TaskBase], dest: Path | None = None) -> None: """Archive a list of calculations in JSON format. Args: tasks: A list of tasks to archive. dest: The filename to use archive the calculation. Defaults to ``"database_<TIME_STAMP>.json"`` where ``TIME_STAMP`` is the current time in ISO format. """ time_stamp = ( datetime.now(UTC) .isoformat() .replace(":", "_") .replace("+", "_") .replace("-", "_") .replace(".", "_") ) json_archive = dest or Path.cwd().joinpath(f"task_{time_stamp}.json") to_dump = { str(d.task_metadata.task_id): d.model_dump(mode="json") for d in tasks } with json_archive.open(mode="w", encoding="utf-8") as file: json.dump( to_dump, fp=file, indent=4, sort_keys=True, ) logger.info("JSON archive written to %s", json_archive)
[docs] def archive_csv( calculations: list[Calculation], dest: Path | None = None, ) -> None: """Archive a list of calculations in CSV format. Args: calculations: A list of calculations to archive. dest: The filename to use archive the calculation. Defaults to ``"database_<TIME_STAMP>.csv"`` where ``TIME_STAMP`` is the current time in ISO format. """ time_stamp = ( datetime.now(UTC) .isoformat() .replace(":", "_") .replace("+", "_") .replace("-", "_") .replace(".", "_") ) csv_archive = dest or Path.cwd().joinpath(f"task_{time_stamp}.csv") flattened = flatten_calculations(calculations) fieldnames: set[str] = set() for flat in flattened: fieldnames = fieldnames.union( key for key, v in flat.items() if not isinstance(v, list | dict | None) ) with csv_archive.open(mode="w", encoding="utf-8") as file: writer = DictWriter(file, fieldnames=fieldnames) writer.writeheader() for row in flattened: writer.writerow({k: row.get(k, None) for k in fieldnames}) logger.info("CSV archive written to %s", csv_archive)
[docs] def archive( filename: str, archive_mode: Literal["csv", "json", "both"], harvested: list[Calculation], *, dest: Path | None = None, time_stamp: bool = False, ) -> None: """Archive completed calculations with the given format. Args: filename: The filename (without extension) with which to archive the calculations. archive_mode: The format with which to archive the calculations. Must be one of ``"csv"``, ``"json"``, or ``"both"``. harvested: The list of calculations to archive. dest: The directory in which to save the archives. time_stamp: Whether or not to time stamp the archive file. """ dest = dest or Path.cwd() if time_stamp: stamp = ( datetime.now(UTC) .isoformat() .replace(":", "_") .replace("+", "_") .replace("-", "_") .replace(".", "_") ) stamp = "_" + stamp else: stamp = "" stem = f"{filename}{stamp}" if archive_mode == "csv": file = dest.joinpath(f"{stem}.{archive_mode}") archive_csv(calculations=harvested, dest=file) elif archive_mode == "json": file = dest.joinpath(f"{stem}.{archive_mode}") archive_json(tasks=harvested, dest=file) elif archive_mode == "both": csv_file = dest.joinpath(f"{stem}.csv") json_file = dest.joinpath(f"{stem}.json") archive_csv(calculations=harvested, dest=csv_file) archive_json(tasks=harvested, dest=json_file)