Search code examples
etlprefect

Is there a way to backfill historical data (once) for a new Flow in Prefect?


I just started reading about Prefect (and have a little experience using Airflow).

My goal is to set a task which runs daily in Prefect and collects data to a folder (I guess that's what Prefect can help me do for sure). Also my goal is to populate the historical data (as if I ran this job back in time).

In Airflow there is this concept of start_date which when set in the past, the DAG will run since that date and populate on each interval.

For example, if I have a task which takes a date and returns the data for that date, such as:

# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
    return fetched_values_in_json(date)

Is there a native way to do this in Prefect? I could not find this answered anywhere in here or the docs, though backfilling is mentioned here. Any help / guidance will be super useful.

What I tried:

When I set schedule to be:

from datetime import datetime, timedelta

from prefect.schedules import Schedule

schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])

and then I do flow.run() I simply get:

INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00

What I was expecting is to run since the start_date which I have provided and then pause until it reaches present time and wait for the next schedule.


Solution

  • Prefect does not make any implicit assumptions about how your Flow or its Tasks depend on time, and therefore performing a backfill depends on the structure of your Flow. There are generally two ways that time explicitly influences a Flow:

    • through a Parameter or DateTimeParameter
    • through prefect.context (which includes many time-related fields, described here)

    Given that, performing a backfill can be achieved by creating any number of ad-hoc scheduled flow runs and overriding either the appropriate context key or the default parameter value. (Note that ad-hoc runs can be created for any flow, regardless of whether that flow has a schedule.)

    To make this more precise, here are two examples that trigger a single backfill run (to accommodate more runs, loop over the appropriate values and create a run for each):

    Using Context

    import pendulum
    import prefect
    
    
    @prefect.task
    def do_something_time_specific():
        """
        This task uses the timestamp provided to the custom `backfill_time`
        context key; if that does not exist, it falls back on the built-in
        `scheduled_start_time` context key.
        """
    
        current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
        if isinstance(current_time, str):
            current_time = pendulum.parse(current_time)
        # performs some action dealing with this timestamp
    
    
    flow = Flow("backfill", tasks=[do_something_time_specific])
    
    ## using Core
    flow.run() # will use current timestamp
    flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp
    
    ## using an API
    prefect.Client().create_flow_run(flow_id="FLOWID", 
        context={"backfill_time": "1986-01-02"}) # will use old timestamp
    

    Using a Parameter

    import pendulum
    import prefect
    
    
    current_time = prefect.Parameter("current_time", default=None)
    
    @prefect.task
    def do_something_time_specific(current_time):
        """
        This task uses the timestamp provided to the task explicitly.
        """
        current_time = current_time or pendulum.now("utc") # uses "now" if not provided
        if isinstance(current_time, str):
            current_time = pendulum.parse(current_time)
    
        # performs some action dealing with this timestamp
    
    
    with Flow("backfill") as flow:
        do_something_time_specific(current_time)
    
    
    ## using Core
    flow.run() # will use current timestamp
    flow.run(current_time="1986-01-02") # will use old timestamp
    
    ## using an API
    prefect.Client().create_flow_run(flow_id="FLOWID", 
        parameters={"current_time": "1986-01-02"}) # will use old timestamp
    

    Newer parameter classes such as DateTimeParameter provide nicer typing guarantees, but hopefully this demonstrates the idea.

    EDIT: For completeness, note that ad-hoc runs can be created in Core for flows with schedules by running flow.run(run_on_schedule=False)