I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.
What I have so far is this:
JavaScript
x
9
1
# Loop task
2
class MyLoop(Task):
3
def run(self):
4
loop_res = prefect.context.get("task_loop_result", 1)
5
print (loop_res)
6
if loop_res >= 10:
7
return loop_res
8
raise LOOP(result=loop_res+1)
9
But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?
Advertisement
Answer
The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:
JavaScript
1
23
23
1
class MultipleTaskLoop(Task):
2
def run(self):
3
# Get previous value
4
loop_res = prefect.context.get("task_loop_result", 1)
5
6
# Create subflow
7
with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
8
x = Parameter('x', default = 1)
9
loop1 = print_loop()
10
add = add_value(x)
11
loop2 = print_loop()
12
loop1.set_downstream(add)
13
add.set_downstream(loop2)
14
15
# Run subflow and extract result
16
subflow_res = flow.run(parameters={'x': loop_res})
17
new_res = subflow_res.result[add]._result.value
18
19
# Loop
20
if new_res >= 10:
21
return new_res
22
raise LOOP(result=new_res)
23
where print_loop
simply prints “loop” in the output and add_value
adds one to the value it receives.