Source code for autojob.harvest.archive

"""I/O functions for the utility scripts (compiled here for consistency)."""

from csv import DictWriter
from datetime import UTC
from datetime import datetime
import json
import logging
from pathlib import Path
from typing import Any
from typing import Literal

from autojob import SETTINGS
from autojob.bases.task_base import TaskBase

logger = logging.getLogger(__name__)


# TODO: Use "." notation for nested attributes
[docs] def flatten_calculations( calculations: list[TaskBase], ) -> list[dict[str, Any]]: """Flatten each calculation into a CSV-friendly format. Args: calculations: The calculations to flatten. Returns: A list of dictionaries mapping calculation fields (e.g., ``energy``, ``forces``, ``zpe_correction``) to their values. The keys of nested dictionaries such as the calculation parameters are also accessible. """ flattened_calculations = [] for calculation in calculations: metadata = calculation.task_metadata.model_dump(mode="json") info = ( calculation.task_inputs.atoms.info if calculation.task_inputs.atoms else {} ) calc_inputs = calculation.calculation_inputs sched_inputs = calculation.scheduler_inputs calc_outputs_shell = {} if calc_outputs := calculation.calculation_outputs: calc_outputs_shell = calc_outputs.model_dump(mode="json") sched_outputs_shell = {} if sched_outputs := calculation.scheduler_outputs: sched_outputs_shell = sched_outputs.model_dump(mode="json") task_outcome = ( calculation.task_outputs.outcome if calculation.task_outputs else None ) flattened = { **metadata, **info, **calc_inputs.calc_params, **calc_inputs.opt_params, **calc_inputs.analyses, **calc_inputs.model_dump( mode="json", exclude={"calc_params", "opt_params", "analyses"} ), **sched_inputs.model_dump(mode="json"), **calc_outputs_shell, **sched_outputs_shell, "outcome": task_outcome, } flattened_calculations.append(flattened) return flattened_calculations
[docs] def archive_json(tasks: list[TaskBase], dest: Path | None = None) -> Path: """Archive a list of calculations in JSON format. Args: tasks: A list of tasks to archive. dest: The filename to use archive the calculation. Defaults to ``"database_<TIME_STAMP>.json"`` where ``TIME_STAMP`` is the current time in ISO format. Returns: The filename of the JSON archive. """ time_stamp = ( datetime.now(UTC) .isoformat() .replace(":", "_") .replace("+", "_") .replace("-", "_") .replace(".", "_") ) json_archive = dest or Path.cwd().joinpath(f"task_{time_stamp}.json") to_dump = { str(d.task_metadata.task_id): d.model_dump(mode="json") for d in tasks } with json_archive.open(mode="w", encoding="utf-8") as file: json.dump( to_dump, fp=file, indent=4, sort_keys=True, ) logger.info("JSON archive written to %s", json_archive) return json_archive
[docs] def archive_csv(tasks: list[TaskBase], dest: Path | None = None) -> Path: """Archive a list of calculations in CSV format. Args: tasks: A list of calculations to archive. dest: The filename to use archive the calculation. Defaults to ``"database_<TIME_STAMP>.csv"`` where ``TIME_STAMP`` is the current time in ISO format. Returns: The filename of the CSV archive. """ time_stamp = ( datetime.now(UTC) .isoformat() .replace(":", "_") .replace("+", "_") .replace("-", "_") .replace(".", "_") ) csv_archive = dest or Path.cwd().joinpath(f"task_{time_stamp}.csv") flattened = flatten_calculations(tasks) fieldnames: set[str] = set() for flat in flattened: fieldnames = fieldnames.union( key for key, v in flat.items() if not isinstance(v, list | dict | None) ) with csv_archive.open(mode="w", encoding="utf-8") as file: writer = DictWriter(file, fieldnames=fieldnames) writer.writeheader() for row in flattened: writer.writerow({k: row.get(k, None) for k in fieldnames}) logger.info("CSV archive written to %s", csv_archive) return csv_archive
[docs] def archive( tasks: list[TaskBase], *, stem: str | None = None, archive_mode: Literal["csv", "json", "both"] = "json", dest: Path | None = None, time_stamp: bool = False, ) -> list[Path]: """Archive completed calculations with the given format. Args: tasks: The list of calculations to archive. stem: The filename stem (**without extension**) with which to archive the calculations. Defaults to ``SETTINGS.ARCHIVE_FILE``. archive_mode: The format with which to archive the calculations. Must be one of ``"csv"``, ``"json"``, or ``"both"``. Defaults to ``"json"``. dest: The directory in which to save the archives. time_stamp: Whether or not to time stamp the archive file. Returns: A list of archives created. """ stem = stem or Path(SETTINGS.ARCHIVE_FILE).stem archive_mode = archive_mode or "json" dest = dest or Path.cwd() if time_stamp: stamp = ( datetime.now(UTC) .isoformat() .replace(":", "_") .replace("+", "_") .replace("-", "_") .replace(".", "_") ) stamp = "_" + stamp else: stamp = "" stem = f"{stem}{stamp}" archives: list[Path] = [] if archive_mode in ("csv", "both"): file = dest.joinpath(f"{stem}.csv") archives.append(archive_csv(tasks=tasks, dest=file)) if archive_mode in ("json", "both"): file = dest.joinpath(f"{stem}.json") archives.append(archive_json(tasks=tasks, dest=file)) return archives