Search code examples
pythonetlprefect

Automatically register new prefect flows?


Is there a mechanism to automatically register flows/new flows if a local agent is running, without having to manually run e.g. flow.register(...) on each one?

In airflow, I believe they have a process that regularly scans for any files with dag in the name in the specified airflow home folder, then searches them for DAG objects. And if it finds them it loads them so they are accessible through the UI without having to manually 'register' them.

Does something similar exist for prefect. So for example if I just created the following file test_flow.py, without necessarily running it or adding flow.run_agent() is there a way for it to just be magically registered and accessible through the UI :) - just by it simply existing in the proper place?

# prefect_home_folder/test_flow.py
import prefect
from prefect import task, Flow

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Cloud!")

flow = Flow("hello-flow", tasks=[hello_task])

flow.register(project_name='main')

I could write a script that has similar behavior to the airflow process to scan a folder and register flows at regular intervals, but I wonder if it's a bit hacky or if there is a better solution and I'm justing thinking too much in terms of airflow?


Solution

  • Great question (and awesome username!) - in short, I suggest you are thinking too much in terms of Airflow. There are a few reasons this is not currently available in Prefect:

    • explicit is better than implicit
    • Prefect flows are not constrained to live in one place and are not constrained to have the same runtime environments; this makes both the automatic discovery of a flow + re-serializing it complicated from a single agent process (which is not required to share the same runtime environment as the flows it submits)
    • agents are better thought of as being parametrized by deployment infrastructure, not flow storage

    Ideally for production workflows you'd use a CI/CD process so that anytime you make a code change an automatic job is triggered that re-registers the flow. A few comments that may be helpful:

    • you don't actually need to re-register the flow for every possible code change; for example, if you changed the message that your hello_task logs in your example, you could simply re-save the flow to its original location (what this looks like depends on the type of storage you use). Ultimately you only need to re-register if any of the metadata about your flow changes (retry settings, task names, dependency relationships, etc.)
    • you can use flow.register("My Project", idempotency_key=flow.serialized_hash()) to automatically capture this; this pattern will only register a new version if the flow's backend representation changes in some way