Working with Tasks

The following How-Tos will illustrate how to perform common procedures while working with tasks.

How to Create Tasks

An empty task can always be instantiated by an empty call to the constructor:

from autojob.tasks.task import Task
from autojob.tasks.calculation import Calculation
from autojob.tasks.vibration import Vibration
from autojob.tasks.scan import BondScan

task = Task()
calc = Calculation()
scan = BondScan()
vib = Vibration()

The new task will have default values set for task inputs, task metadata, and any other inputs defined by the TaskBase implementation.

Creating multiple tasks is as simple as iterating through a for loop.

from ase.data.g2 import data
from ase.build import molecule

from autojob.tasks.task import TaskInputs
from autojob.tasks.calculation import Calculation

calcs = []

for name in data.molecule_names:
    atoms = molecule(name)
    tinputs = TaskInputs(atoms=atoms)
    calcs.append(Calculation(task_inputs=tinputs))

How to Visualize the Results of a Vibration Task

In addition to obtaining the calculated Hessian matrix, vibrational energies, and zero-point energy, one may be interested in visualizing the normal modes for a system. To do so, first create a Vibration task.

from ase.build import molecule

from autojob.tasks.calculation import CalculationInputss
from autojob.tasks.task import TaskInputs
from autojob.tasks.vibration import Vibration

# Create Vibration task
co = molecule("CO")
tinputs = TaskInputs(atoms=co)
cinputs = CalculationInputs(
    calculator="EMT"
    calculation_script_template="vib.py.j2"
)
vib = Vibration(
    task_inputs=tinputs,
    calculation_inputs=cinputs
)

The vib.py.j2 template is analogous to the run.py.j2 template; however, this template calls the function ccu.workflows.run_vibration() using the VibrationInputs of the Vibration task. Note that we have not specified any VibrationInputs. The default inputs defined in VibrationInputs will result in all atoms of the structure being displaced during the vibrational analysis. To explicitly specify a list of atoms to be displaced, specify the indices parameter for VibrationInputs.

from ase.build import molecule

from autojob.tasks.calculation import CalculationInputss
from autojob.tasks.task import TaskInputs
from autojob.tasks.vibration import Vibration

# Create Vibration task
co = molecule("CO")
tinputs = TaskInputs(atoms=co)
cinputs = CalculationInputs(calculation_script_template="vib.py.j2")
# Only the first atom will be displaced
vinputs = VibrationInputs(indices=[0])
vib = Vibration(
    task_inputs=tinputs,
    calculation_inputs=cinputs
    vibration_inputs=vinputs
)

Alternatively, you can specify which atoms to displace for the vibrational analysis by tag using the tags_to_unfreeze parameter.

from ase.build import molecule

from autojob.tasks.calculation import CalculationInputss
from autojob.tasks.task import TaskInputs
from autojob.tasks.vibration import Vibration

# Create Vibration task
co = molecule("CO")
tag = -99
co.set_tags([tag for _ in co])
tinputs = TaskInputs(atoms=co)
cinputs = CalculationInputs(calculation_script_template="vib.py.j2")
# Only the first atom will be displaced
vinputs = VibrationInputs(tags_to_unfreeze=[tag])
vib = Vibration(
    task_inputs=tinputs,
    calculation_inputs=cinputs
    vibration_inputs=vinputs
)

Next, write the task to a directory and run the task.

from pathlib import Path

dest = Path()
vib.write_inputs(dest)

# Run Vibration task
...

The results of a vibrational analysis can be harvested like any other task.

from autojob.tasks.vibration import Vibration

# Harvest Vibration task
harvested = Vibration.from_directory(dest)

Finally, construct a VibrationsData object and visualize the vibrational modes.

import ase.visualize
from ase.vibrations.data import VibrationsData

atoms = harvested.vibrations_outputs.atoms
hessian = harvested.vibrations_outputs.hessian
indices = harvested.vibrations_outputs.indices
vib_data = VibrationsData(atoms=atoms, hessian=hessian, indices=indices)

# Visualize vibration modes
for i, _ in enumerate(vib_data.get_modes()):
    images = list(vib_data.iter_animated_mode(i))
    ase.visualize.view(images)

How to Dump Tasks to a Directory

Multiple tasks can either be dumped to a structured directory by collecting them into a StudyGroup or individually dumped into unstructured directories.

To write the inputs of a single task to a directory

from pathlib import Path

from autojob.tasks.task import Task

task = Task()
task.task_inputs.atoms = molecule("CO")
task.write_inputs(".")

To dump tasks into a structured directory

from pathlib import Path

from ase.data.g2 import data
from ase.build import molecule

from autojob.study import Study
from autojob.study_group import StudyGroup
from autojob.tasks.task import TaskInputs, TaskMetadata, Task

sg = StudyGroup()
study = Study(study_group_id=sg.study_group_id)
tasks = []

for name in data.molecule_names:
    atoms = molecule(name)
    tinputs = TaskInputs(atoms=atoms)
    metadata = TaskMetadata(
        study_group_id=sg.study_group_id,
        study_id=study.study_id,
    )
    task = Task(
        task_inputs=tinputs,
        task_metadata=metadata,
    )
    tasks.append(task)

study.tasks += tasks
sg.studies.append(study)
dest = Path()
sg.to_directory(dest)

To dump tasks into unstructured directories

from pathlib import Path

from ase.data.g2 import data
from ase.build import molecule

from autojob.study import Study
from autojob.study_group import StudyGroup
from autojob.tasks.task import TaskInputs, TaskMetadata, Task

sg = StudyGroup()
study = Study(study_group_id=sg.study_group_id)
name_template = "task_{i}"
dest = Path()
tasks = []

for i, name in enumerate(data.molecule_names):
    atoms = molecule(name)
    tinputs = TaskInputs(atoms=atoms)
    metadata = TaskMetadata(
        study_group_id=sg.study_group_id,
        study_id=study.study_id,
    )
    task = Task(
        task_inputs=tinputs,
        task_metadata=metadata,
    )
    new_task_dir = create_task_tree(task, dest, name_template.format(i))

How to Harvest Tasks

Tasks can be harvested from directories either directly using Task.from_directory() or indirectly using harvest(). If the specific type of task to load is known, then it is recommended to directly use that classes Task.from_directory() method:

from pathlib import Path

from autojob.tasks.task import Task
from autojob.tasks.calculation import Calculation
from autojob.tasks.scan import BondScan

task = Task.from_directory(Path("task_directory"))
calc = Calculation.from_directory(Path("calc_directory"))
scan = BondScan.from_directory(Path("bond_scan_directory"))

A more robust interface is to call Task.from_directory() with magic_mode=True. This parameter will use the value of the "task_class" key in the task’s metadata to load to correct implementation of TaskBase.

from pathlib import Path

from autojob.tasks.task import Task

task = Task.from_directory(Path("task_directory", magic_mode=True))

Note

This feature requires "task_class" to be a registered task.

Finally, several tasks can be harvested from all task directories in a given using harvest().

from pathlib import Path

from autojob.harvest.harvest import harvest

tasks = harvest(Path())

This function searches for all subdirectories of the path provided that contain a task metadata file. The name of task metadata files is controlled by SETTINGS.TASK_METADATA_FILE setting.

How to Archive Tasks

autojob defines a convenience method for serializing collections of tasks into JSON and CSV files, archive().

from pathlib import Path

from autojob.harvest.archive import archive
from autojob.harvest.harvest import harvest

harvested = harvest(Path())
archive(
    filename="archive.json",
    archive_mode="both",
    harvested=harvested,
    time_stamp=True,
)

Of course, because tasks are Pydantic models, they can easily converted into dictionaries using their pydantic.BaseModel.model_dump() methods. One can then use libraries like Pandas to process the data further or h5py to prepare more complicated formats.

How to Restart Tasks

To restart tasks from their directories, use restart(). Within Python,

from pathlib import Path

from autojob.next.restart import restart

new_task = restart(Path())

From the command line, use autojob restart:

autojob restart