I've been working with project Reactor for a while but I'm fairly new to SI, specially when it comes to using reactive components in SI. This might be very basic but I can't still figure it out and the more I read the Spring docs the less I understand.
I have this flow where I'm using a reactive adapter that returns Mono<Void>.
And as I understand, once you're using a handler/gateway that returns either void
or Mono<Void>
(one-way 'MessageHandler') then that signals the end of the flow and you can't add anything else downstream.
So that leaves me with this:
@Bean
public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
return f -> f
// transformations, routing and other stuff..
.handle(adapter::handleMessage)
}
Now. How will the subscription to that Mono be done? What if I want to subscribe with a channel to that Mono? Even if the Mono is not emitting any items I'd like to be able to subscribe to the completion signal.
If for example I use the MongoDb.reactiveOutboundChannelAdapter(...)
, when I run the flow, I can see that the payload is in fact being stored in mongo. So I assume the subscription is being done automatically?
I guess my intention would be to get some kind of receipt that the operation was done.
Thanks for the help!
The MongoDb.reactiveOutboundChannelAdapter(...)
is represented by the ReactiveMongoDbStoringMessageHandler
wrapped to the ReactiveMessageHandlerAdapter
to really make that subscription automatically.
Actually the same is going to happen to any Publisher
reply if service activator is marked as async
. So, if your adapter::handleMessage
returns Mono<Void>
, you can do it like this and you'll get a completion of that Mono
:
@Bean
public IntegrationFlow reactiveStore() {
return f -> f
.<Object>handle((p, h) -> Mono.empty(),
e -> e.async(true)
.customizeMonoReply((message, mono) ->
mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
}
And I get this in logs:
Completed: null
I probably need to revise a ReactiveMessageHandlerAdapter
because it looks like any ReactiveMessageHandler
can be handled directly and more smoothly. Please, raise a GH issue so we don't lose our discussion.
Meanwhile here are docs to clarify you more: