Search code examples
spring-bootspring-integrationproject-reactorspring-integration-amqp

Does Spring Integration ServiceActivator work with Project Reactor Types?


I'm using Spring Integration 5.1.5 (with RabbitMQ using spring-integration-amqp) and I reading in the docs that Spring Integration has support for project reactor types (by this I mean Mono, Flux etc). But I cannot get this to work for ServiceActivator's. I'm trying something like this:

@ServiceActivator
public Mono<Void> myMethod(List<Message> messages) {
   Mono<Void> result = myService.doServiceStuff(messages);
   return result;
}

(note that I'm also trying to get myMethod working with Flux<Message> but that's a separate issue).

When myMethod returns Mono<Void> I get this error:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    ... 42 common frames omitted

Changing the method to:

@ServiceActivator
public void myMethod(List<Message> messages) {
   Mono<Void> result = myService.doServiceStuff(messages);
   result.subscribe(); // This is not what I want to do
}

and subscribing to the reactive stream manually will make it work, however this is obviously not what I want to do. I would rather expect the spring-integration framework to handle the subscription.

Is this supported in Spring Integration? If so, what am I doing wrong?


Solution

  • Exactly what are you trying to achieve by returning Mono<Void> ?

    When a service activator method returns any value, that value is sent to the output channel. When it's a Mono<?>, the send is performed when the mono completes.

    Simply set the return type to void.