"""Create workflows from multiple step."""fromcollections.abcimportIterablefromcopyimportdeepcopyfromgraphlibimportTopologicalSorterimportjsonimportpathlibfromtypingimportAnyfromtypingimportLiteralfromuuidimportUUIDfrompydanticimportBaseModelfrompydanticimportFieldfromautojobimportSETTINGSfromautojob.parametrizationsimportVariableReferenceAnchorLevel=Literal[0,1]
[docs]classStep(BaseModel):"""A step in a workflow."""workflow_step_id:UUIDtask_type:strprogression:Literal["independent","dependent"]=Field(default="independent",description="How the step is connected to the tasks of the previous ""step. 'dependent' indicates that the given step cannot start until ""every task in the previous step has completed. 'independent' ""indicates the opposite.",)parametrizations:list[list[VariableReference[Any]]]=Field(min_length=1)
[docs]classWorkflow(TopologicalSorter):"""The structure of a workflow."""def__init__(self,graph:dict[str,Iterable[str]])->None:"""Initialize a `Workflow`. Args: graph: A directed-acyclic graph representing the workflow. """self._graph={k:list(v)fork,vindeepcopy(graph).items()}super().__init__(graph)def__getitem__(self,key:Any)->list[str]:"""Get the ancestors of the step indicated by `key`."""returnself._graph[key]
[docs]defget_predecessors(self,step_id:str)->list[str]:"""Return the immediate ancestors of a workflow step. Args: step_id: A string representation of a workflow step ID. """returnself._graph[step_id]
[docs]defget_next_steps(self,step_id:str,record:list[str]|None=None)->list[str]:"""Get all successors of a step. Args: step_id: A string representation of a workflow step ID record: A list of strings where each string represents the workflow step ID of a completed task. If None, all successive steps will be returned. Defaults to None. Returns: A list of strings where each string represents a workflow step ID. """active_steps:list[str]=[]forsuccessor,predecessorsinself._graph.items():ifstep_idinpredecessorsand(recordisNoneorall(predecessorinrecordforpredecessorinpredecessors)):active_steps.append(successor)returnactive_steps
[docs]@classmethoddeffrom_directory(cls,dir_name:pathlib.Path)->"Workflow":"""Construct a `Workflow` from a directory."""workflow_file=dir_name.joinpath(SETTINGS.WORKFLOW_FILE)withworkflow_file.open(mode="r",encoding="utf-8")aswf:returncls(json.load(wf))