Is there a way to change the task_runner
within a prefect deployment? I would like to have possibility to have for a single flow a deployment with say ConcurrentTaskRunner
and DaskTaskRunner
(local or remote).
The only way I have found so far is to create within deployment:
infra_overrides: env: dask_server: True
And on the flow level something like:
def determine_runner(): return DaskTaskRunner if os.environ.get("dask_server") == "True" else ConcurrentTaskRunner @flow(task_runner=determine_runner()) def my_flow(): pass
This works as in normal run I don’t have variable dask_server
and in special deployment run where I set this variable agent starts each run on clean environment with this variable set up. But my guess is that there must be a better way. If there was a solution on deployment level I could have a single function building from flows instead of adding to each flow a function determine_runner
.
Of course it would be best if there was possibility to do something like:
Deployment.build_from_flow( ... task_runner=my_preferred_runner, )
Which is not implemented.
Advertisement
Answer
You can add environment variables that determine which task runner gets used. This GitHub issue has a detailed explanation but here is a TL;DR:
@flow( task_runner=DaskTaskRunner() if os.environ.get("MY_ENV") == "prod" else ConcurrentTaskRunner(), ) def my_flow():