Search code examples
dagster

Dagster -Execute an @Op only when all parallel executions are finished(DynamicOutput)


I have a problem that in fact I am not able to solve in dagster.

I have the following configuration:

I have step 1 where I get the data from an endpoint

step 2 gets a list of customers dynamically:

step 3 is the database update with the response from step 1, for each customer from step 2, but in parallel.

before calling step 3, I have a function that serves to create DynamicOutput for each client of step 2, with the name "parallelize_clients "so that when it is invoked, it parallelizes the update processes of step_3 and finally I have a graph to join operations.

@op()
def step_1_get_response():
    return {'exemple': 'data'}

@op()
def step_2_get_client_list():
    return ['client_1', 'client_2', 'client_3'] #the number of customers is dynamic.

@op(out=DynamicOut())
def parallelize_clients(context, client_list):
    for client in client_list:
        yield DynamicOutput(client, mapping_key=str(client))


@op()
def step_3_update_database_cliente(response, client):
    ...OPERATION UPDATE IN DATABASE CLIENT

@graph()
def job_exemple_graph():
    response = step_1_get_response()
    clients_list = step_2_get_client_list()
    clients = parallelize_clients(clients_list)
    #run the functions in parallel
    clients.map(lambda client: step_3_update_database_cliente(response, client))

According to the documentation, an @Op starts as soon as its dependencies are fulfilled, and in the case of Ops that have no dependency, they are executed instantly, without having an exact order of execution. Example: My step1 and step2 have no dependencies, so both are running in parallel automatically. After the clients return, the "parallelize_clients()" function is executed, and finally, I have a map in the graph that dynamically creates several executions according to the amount of client(DynamicOutput)

enter image description here

So far it works, and everything is fine. Here's the problem. I need to execute a specific function only when step3 is completely finished, and as it is created dynamically, several executions are generated in parallel, however, I am not able to control to execute a function only when all these executions in parallel are finished.

in the graph I tried to put the call to an op "exemplolaststep() step_4" at the end, however, step 4 is executed together with "step1" and "step2", and I really wanted step4 to only execute after step3, but not I can somehow get this to work. Could someone help me?

enter image description here

I tried to create a fake dependency with


@op(ins={"start": In(Nothing)})
def step_4():
    pass

and in the graph, when calling the operations, I tried to execute the map call inside the step_4() function call; Example

@graph()
def job_exemple_graph():
    response = step_1_get_response()
    clients_list = step_2_get_client_list()
    clients = parallelize_clients(clients_list)
    #run the functions in parallel
    step_4(start=clients.map(lambda client: step_3_update_database_cliente(response, client)))

I have tried other approaches as well, however, to no avail.


Solution

  • You just need to add a .collect() call on the mapped function in your graph, to indicate that all the parallel operations should join before moving on. Something like

    @graph()
    def job_exemple_graph():
        response = step_1_get_response()
        clients_list = step_2_get_client_list()
        clients = parallelize_clients(clients_list)
        # run the functions in parallel
        step_4(
            start=clients.map(
                lambda client: step_3_update_database_cliente(response, client)
            ).collect()
        )