Search code examples
pythonprefect

Changing task_runner in prefect deployment


Is there a way to change the task_runner within a prefect deployment? I would like to have possibility to have for a single flow a deployment with say ConcurrentTaskRunner and DaskTaskRunner (local or remote).

The only way I have found so far is to create within deployment:

infra_overrides:
  env:
    dask_server: True

And on the flow level something like:

def determine_runner():
    return DaskTaskRunner if os.environ.get("dask_server") == "True" else ConcurrentTaskRunner

@flow(task_runner=determine_runner())
def my_flow():
    pass

This works as in normal run I don't have variable dask_server and in special deployment run where I set this variable agent starts each run on clean environment with this variable set up. But my guess is that there must be a better way. If there was a solution on deployment level I could have a single function building from flows instead of adding to each flow a function determine_runner.

Of course it would be best if there was possibility to do something like:

Deployment.build_from_flow(
...
task_runner=my_preferred_runner,
)

Which is not implemented.


Solution

  • You can add environment variables that determine which task runner gets used. This GitHub issue has a detailed explanation but here is a TL;DR:

    @flow(
        task_runner=DaskTaskRunner()
        if os.environ.get("MY_ENV") == "prod"
        else ConcurrentTaskRunner(),
    )
    def my_flow():