Using prefect, I would like to create a new flow from two other flows.
The error I am getting is A task with the slug "add_num" already exists in this flow.
Is it possible to update Flows
that use the same tasks
or Parameters
. Below is a minimal example of what I am trying to accomplish.
`
from prefect import task, Flow, Parameter
@task
def add_one(x):
return x+1
with Flow("Flow 1") as flow_1:
add_num = Parameter("add_num", default=10)
new_num1 = add_one(add_num)
@task
def add_two(y):
return y+1
with Flow("Flow 2") as flow_2:
add_num = Parameter("add_num", default=10)
new_num2 = add_two(add_num)
combo_fl = Flow("Add Numbers")
combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)
I did see this piece of code on the slack channel that might tie into solving this problem but I am not sure how to use it.
class GlobalParameter(Parameter):
def __init__(self, name, slug=None, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.slug = slug or uuid.uuid4()
Thanks in advance.
Because Parameters are uniquely identified by name in the API, you can't combine two flows that have different Parameters of the same name. However, what you can do is use a common Parameter within each flow as follows:
from prefect import task, Flow, Parameter
## initialize the Parameter outside of any
## Flow context
add_num = Parameter("add_num", default=10)
@task
def add_one(x):
return x+1
with Flow("Flow 1") as flow_1:
new_num1 = add_one(add_num)
@task
def add_two(y):
return y+1
with Flow("Flow 2") as flow_2:
new_num2 = add_two(add_num)
combo_fl = Flow("Add Numbers")
combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)
Because the Parameter being used is actually the same instance of the Parameter class, your update will succeed.