I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.
What I have so far is this:
# Loop task class MyLoop(Task): def run(self): loop_res = prefect.context.get("task_loop_result", 1) print (loop_res) if loop_res >= 10: return loop_res raise LOOP(result=loop_res+1)
But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?
Advertisement
Answer
The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:
class MultipleTaskLoop(Task): def run(self): # Get previous value loop_res = prefect.context.get("task_loop_result", 1) # Create subflow with Flow('Subflow', executor=LocalDaskExecutor()) as flow: x = Parameter('x', default = 1) loop1 = print_loop() add = add_value(x) loop2 = print_loop() loop1.set_downstream(add) add.set_downstream(loop2) # Run subflow and extract result subflow_res = flow.run(parameters={'x': loop_res}) new_res = subflow_res.result[add]._result.value # Loop if new_res >= 10: return new_res raise LOOP(result=new_res)
where print_loop
simply prints “loop” in the output and add_value
adds one to the value it receives.