Search code examples
prefect

Is there a way to update parameters as each scheduled flow run?


I'm trying to find a way to update the parameters begin used for each iteration of a schedule flow.

For example, say I have a flow that's schedule to run once every Monday for a year. For the first Monday the flow needs to run with a parameter of say 5. The next Monday needs to be ran with a parameter of 7 etc. The parameter needed for each weeks' run would change by a constant number.

Based on the docs, it looks like I could create a clock with the corresponding parameter for each run but that seems excessive for flows that are schedule for many runs.

Is there a simpler way of doing this in Prefect?


Solution

  • This sounds like you want to encapsulate some form of business logic into your Parameter calculation, which is a great use case for adding a new Task to your Flow:

    import prefect
    from prefect import task, Flow, Parameter
    
    
    varying_param = Parameter("param", default=None) # none as the default
    
    @task(name="Varying Parameter")
    def param_calculation(p):
        time = prefect.context.scheduled_start_time 
    
        # if a value was provided, use it
        if p is not None:
            return p
        
        # do some calculations to decide what value is appropriate
        # and return it
    
    
    with Flow("Minimal example") as flow:
        param_value = param_calculation(varying_param)
        # now use this value in downstream tasks