Search code examples
spring-integrationproject-reactor

Subscribing to Mono<Void> from reactive one-way handlers in SI


I've been working with project Reactor for a while but I'm fairly new to SI, specially when it comes to using reactive components in SI. This might be very basic but I can't still figure it out and the more I read the Spring docs the less I understand.

I have this flow where I'm using a reactive adapter that returns Mono<Void>. And as I understand, once you're using a handler/gateway that returns either void or Mono<Void> (one-way 'MessageHandler') then that signals the end of the flow and you can't add anything else downstream. So that leaves me with this:

  @Bean
  public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
    return f -> f
     // transformations, routing and other stuff..
     .handle(adapter::handleMessage)      
  }

Now. How will the subscription to that Mono be done? What if I want to subscribe with a channel to that Mono? Even if the Mono is not emitting any items I'd like to be able to subscribe to the completion signal.

If for example I use the MongoDb.reactiveOutboundChannelAdapter(...), when I run the flow, I can see that the payload is in fact being stored in mongo. So I assume the subscription is being done automatically?

I guess my intention would be to get some kind of receipt that the operation was done.

Thanks for the help!


Solution

  • The MongoDb.reactiveOutboundChannelAdapter(...) is represented by the ReactiveMongoDbStoringMessageHandler wrapped to the ReactiveMessageHandlerAdapter to really make that subscription automatically.

    Actually the same is going to happen to any Publisher reply if service activator is marked as async. So, if your adapter::handleMessage returns Mono<Void>, you can do it like this and you'll get a completion of that Mono:

        @Bean
        public IntegrationFlow reactiveStore() {
            return f -> f
                    .<Object>handle((p, h) -> Mono.empty(),
                            e -> e.async(true)
                                    .customizeMonoReply((message, mono) ->
                                            mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
        }
    

    And I get this in logs:

    Completed: null
    

    I probably need to revise a ReactiveMessageHandlerAdapter because it looks like any ReactiveMessageHandler can be handled directly and more smoothly. Please, raise a GH issue so we don't lose our discussion.

    Meanwhile here are docs to clarify you more:

    https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-message-handler

    https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#reactive-advice