I'm working on converting a blocking sequential orchestration framework to reactive. Right now, these tasks are dynamic and are fed into the engine by a JSON input. The engine pulls classes and executes the run()
method and saves the state with the responses from each task.
How do I achieve the same chaining in reactor? If this was a static DAG, I would have chained it with flatMap
or then
operators but since it is dynamic, How do I proceed with executing a reactive task and collecting the output from each task?
Examples:
Non reactive interface:
public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}
Core Engine
public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}
Following the above example, if I convert tasks to return Mono<?> then, how do I wait or chain other tasks to operate on the result on previous tasks? Any help is appreciated. Thanks.
Update::
Reactive Task example.
public class SampleTask implements OrchestrationTask {
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}
So i will have a series of tasks that accomplish various things but the response from each task is dependent on the previous and is stored in the Orchestration Context. Anytime an error is occurred, the orchestration context flag will be set to false and the flux should stop.
Sure, we can:
flatMap()
each task to your task.run()
method (which as per the question now returns a Mono
;evaluateResult()
is true;SUCCESS
status as before.So putting all that together, just replace your loop & return statement with:
Flux.fromIterable(tasks)
.flatMap(task -> task.run(ctx))
.takeWhile(stepContext -> evaluateResult(stepContext))
.then(Mono.just(Status.SUCCESS));
(Since we've made it reactive, your method will obviously need to return a Mono<Status>
rather than just Status
too.)
Update as per the comment - if you just want this to execute "one at a time" rather than with multiple concurrently, you can use concatMap()
instead of flatMap()
.