I have a problem that in fact I am not able to solve in dagster.
I have the following configuration:
I have step 1 where I get the data from an endpoint
step 2 gets a list of customers dynamically:
step 3 is the database update with the response from step 1, for each customer from step 2, but in parallel.
before calling step 3, I have a function that serves to create DynamicOutput for each client of step 2, with the name "parallelize_clients "so that when it is invoked, it parallelizes the update processes of step_3 and finally I have a graph to join operations.
@op()
def step_1_get_response():
return {'exemple': 'data'}
@op()
def step_2_get_client_list():
return ['client_1', 'client_2', 'client_3'] #the number of customers is dynamic.
@op(out=DynamicOut())
def parallelize_clients(context, client_list):
for client in client_list:
yield DynamicOutput(client, mapping_key=str(client))
@op()
def step_3_update_database_cliente(response, client):
...OPERATION UPDATE IN DATABASE CLIENT
@graph()
def job_exemple_graph():
response = step_1_get_response()
clients_list = step_2_get_client_list()
clients = parallelize_clients(clients_list)
#run the functions in parallel
clients.map(lambda client: step_3_update_database_cliente(response, client))
According to the documentation, an @Op starts as soon as its dependencies are fulfilled, and in the case of Ops that have no dependency, they are executed instantly, without having an exact order of execution. Example: My step1 and step2 have no dependencies, so both are running in parallel automatically. After the clients return, the "parallelize_clients()" function is executed, and finally, I have a map in the graph that dynamically creates several executions according to the amount of client(DynamicOutput)
So far it works, and everything is fine. Here's the problem. I need to execute a specific function only when step3 is completely finished, and as it is created dynamically, several executions are generated in parallel, however, I am not able to control to execute a function only when all these executions in parallel are finished.
in the graph I tried to put the call to an op "exemplolaststep() step_4" at the end, however, step 4 is executed together with "step1" and "step2", and I really wanted step4 to only execute after step3, but not I can somehow get this to work. Could someone help me?
I tried to create a fake dependency with
@op(ins={"start": In(Nothing)})
def step_4():
pass
and in the graph, when calling the operations, I tried to execute the map call inside the step_4() function call; Example
@graph()
def job_exemple_graph():
response = step_1_get_response()
clients_list = step_2_get_client_list()
clients = parallelize_clients(clients_list)
#run the functions in parallel
step_4(start=clients.map(lambda client: step_3_update_database_cliente(response, client)))
I have tried other approaches as well, however, to no avail.
You just need to add a .collect()
call on the mapped function in your graph, to indicate that all the parallel operations should join before moving on. Something like
@graph()
def job_exemple_graph():
response = step_1_get_response()
clients_list = step_2_get_client_list()
clients = parallelize_clients(clients_list)
# run the functions in parallel
step_4(
start=clients.map(
lambda client: step_3_update_database_cliente(response, client)
).collect()
)