Search code examples
spring-bootspring-webfluxreactor

Sequential execution of Reactive tasks in reactor Java


I'm working on converting a blocking sequential orchestration framework to reactive. Right now, these tasks are dynamic and are fed into the engine by a JSON input. The engine pulls classes and executes the run() method and saves the state with the responses from each task.
How do I achieve the same chaining in reactor? If this was a static DAG, I would have chained it with flatMap or then operators but since it is dynamic, How do I proceed with executing a reactive task and collecting the output from each task?

Examples:
Non reactive interface:

public interface OrchestrationTask {
 OrchestrationContext run(IngestionContext ctx);
}

Core Engine

public Status executeDAG(String id) {
  IngestionContext ctx = ContextBuilder.getCtx(id);
  List<OrchestrationTask> tasks = app.getEligibleTasks(id);

  for(OrchestrationTask task : tasks) {
    // Eligible tasks are executed sequentially and results are collected.
    OrchestrationContext stepContext = task.run(ctx);
    if(!evaluateResult(stepContext)) break;
  }
  return Status.SUCCESS;
}

Following the above example, if I convert tasks to return Mono<?> then, how do I wait or chain other tasks to operate on the result on previous tasks? Any help is appreciated. Thanks.

Update::

Reactive Task example.

public class SampleTask implements OrchestrationTask {  
  @Override
  public Mono<OrchestrationContext> run(OrchestrationContext context) {  
  // Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
  return Mono.just(context).delayElements(Duration.ofSeconds(2));
 }

So i will have a series of tasks that accomplish various things but the response from each task is dependent on the previous and is stored in the Orchestration Context. Anytime an error is occurred, the orchestration context flag will be set to false and the flux should stop.


Solution

  • Sure, we can:

    • Create the flux from the task list (if it's appropriate to generate the task list reactively then you can replace that arraylist with the flux directly, if not then keep as-is);
    • flatMap() each task to your task.run() method (which as per the question now returns a Mono;
    • Ensure we only consume elements while evaluateResult() is true;
    • ...then finally just return the SUCCESS status as before.

    So putting all that together, just replace your loop & return statement with:

    Flux.fromIterable(tasks)
            .flatMap(task -> task.run(ctx))
            .takeWhile(stepContext -> evaluateResult(stepContext))
            .then(Mono.just(Status.SUCCESS));
    

    (Since we've made it reactive, your method will obviously need to return a Mono<Status> rather than just Status too.)

    Update as per the comment - if you just want this to execute "one at a time" rather than with multiple concurrently, you can use concatMap() instead of flatMap().