"""I/O functions for the utility scripts (compiled here for consistency)."""
from csv import DictWriter
from datetime import UTC
from datetime import datetime
import json
import logging
from pathlib import Path
from typing import Any
from typing import Literal
from autojob import SETTINGS
from autojob.bases.task_base import TaskBase
logger = logging.getLogger(__name__)
# TODO: Use "." notation for nested attributes
[docs]
def flatten_calculations(
calculations: list[TaskBase],
) -> list[dict[str, Any]]:
"""Flatten each calculation into a CSV-friendly format.
Args:
calculations: The calculations to flatten.
Returns:
A list of dictionaries mapping calculation fields (e.g., ``energy``,
``forces``, ``zpe_correction``) to their values. The keys of
nested dictionaries such as the calculation parameters are also
accessible.
"""
flattened_calculations = []
for calculation in calculations:
metadata = calculation.task_metadata.model_dump(mode="json")
info = (
calculation.task_inputs.atoms.info
if calculation.task_inputs.atoms
else {}
)
calc_inputs = calculation.calculation_inputs
sched_inputs = calculation.scheduler_inputs
calc_outputs_shell = {}
if calc_outputs := calculation.calculation_outputs:
calc_outputs_shell = calc_outputs.model_dump(mode="json")
sched_outputs_shell = {}
if sched_outputs := calculation.scheduler_outputs:
sched_outputs_shell = sched_outputs.model_dump(mode="json")
task_outcome = (
calculation.task_outputs.outcome
if calculation.task_outputs
else None
)
flattened = {
**metadata,
**info,
**calc_inputs.calc_params,
**calc_inputs.opt_params,
**calc_inputs.analyses,
**calc_inputs.model_dump(
mode="json", exclude={"calc_params", "opt_params", "analyses"}
),
**sched_inputs.model_dump(mode="json"),
**calc_outputs_shell,
**sched_outputs_shell,
"outcome": task_outcome,
}
flattened_calculations.append(flattened)
return flattened_calculations
[docs]
def archive_json(tasks: list[TaskBase], dest: Path | None = None) -> Path:
"""Archive a list of calculations in JSON format.
Args:
tasks: A list of tasks to archive.
dest: The filename to use archive the calculation. Defaults to
``"database_<TIME_STAMP>.json"`` where ``TIME_STAMP`` is the
current time in ISO format.
Returns:
The filename of the JSON archive.
"""
time_stamp = (
datetime.now(UTC)
.isoformat()
.replace(":", "_")
.replace("+", "_")
.replace("-", "_")
.replace(".", "_")
)
json_archive = dest or Path.cwd().joinpath(f"task_{time_stamp}.json")
to_dump = {
str(d.task_metadata.task_id): d.model_dump(mode="json") for d in tasks
}
with json_archive.open(mode="w", encoding="utf-8") as file:
json.dump(
to_dump,
fp=file,
indent=4,
sort_keys=True,
)
logger.info("JSON archive written to %s", json_archive)
return json_archive
[docs]
def archive_csv(tasks: list[TaskBase], dest: Path | None = None) -> Path:
"""Archive a list of calculations in CSV format.
Args:
tasks: A list of calculations to archive.
dest: The filename to use archive the calculation. Defaults to
``"database_<TIME_STAMP>.csv"`` where ``TIME_STAMP`` is the
current time in ISO format.
Returns:
The filename of the CSV archive.
"""
time_stamp = (
datetime.now(UTC)
.isoformat()
.replace(":", "_")
.replace("+", "_")
.replace("-", "_")
.replace(".", "_")
)
csv_archive = dest or Path.cwd().joinpath(f"task_{time_stamp}.csv")
flattened = flatten_calculations(tasks)
fieldnames: set[str] = set()
for flat in flattened:
fieldnames = fieldnames.union(
key
for key, v in flat.items()
if not isinstance(v, list | dict | None)
)
with csv_archive.open(mode="w", encoding="utf-8") as file:
writer = DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for row in flattened:
writer.writerow({k: row.get(k, None) for k in fieldnames})
logger.info("CSV archive written to %s", csv_archive)
return csv_archive
[docs]
def archive(
tasks: list[TaskBase],
*,
stem: str | None = None,
archive_mode: Literal["csv", "json", "both"] = "json",
dest: Path | None = None,
time_stamp: bool = False,
) -> list[Path]:
"""Archive completed calculations with the given format.
Args:
tasks: The list of calculations to archive.
stem: The filename stem (**without extension**) with which to
archive the calculations. Defaults to ``SETTINGS.ARCHIVE_FILE``.
archive_mode: The format with which to archive the calculations. Must
be one of ``"csv"``, ``"json"``, or ``"both"``. Defaults to
``"json"``.
dest: The directory in which to save the archives.
time_stamp: Whether or not to time stamp the archive file.
Returns:
A list of archives created.
"""
stem = stem or Path(SETTINGS.ARCHIVE_FILE).stem
archive_mode = archive_mode or "json"
dest = dest or Path.cwd()
if time_stamp:
stamp = (
datetime.now(UTC)
.isoformat()
.replace(":", "_")
.replace("+", "_")
.replace("-", "_")
.replace(".", "_")
)
stamp = "_" + stamp
else:
stamp = ""
stem = f"{stem}{stamp}"
archives: list[Path] = []
if archive_mode in ("csv", "both"):
file = dest.joinpath(f"{stem}.csv")
archives.append(archive_csv(tasks=tasks, dest=file))
if archive_mode in ("json", "both"):
file = dest.joinpath(f"{stem}.json")
archives.append(archive_json(tasks=tasks, dest=file))
return archives