I'm using Spring Integration 5.1.5 (with RabbitMQ using spring-integration-amqp
) and I reading in the docs that Spring Integration has support for project reactor types (by this I mean Mono
, Flux
etc). But I cannot get this to work for ServiceActivator's. I'm trying something like this:
@ServiceActivator
public Mono<Void> myMethod(List<Message> messages) {
Mono<Void> result = myService.doServiceStuff(messages);
return result;
}
(note that I'm also trying to get myMethod
working with Flux<Message>
but that's a separate issue).
When myMethod
returns Mono<Void>
I get this error:
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
... 42 common frames omitted
Changing the method to:
@ServiceActivator
public void myMethod(List<Message> messages) {
Mono<Void> result = myService.doServiceStuff(messages);
result.subscribe(); // This is not what I want to do
}
and subscribing to the reactive stream manually will make it work, however this is obviously not what I want to do. I would rather expect the spring-integration
framework to handle the subscription.
Is this supported in Spring Integration? If so, what am I doing wrong?
Exactly what are you trying to achieve by returning Mono<Void>
?
When a service activator method returns any value, that value is sent to the output channel. When it's a Mono<?>
, the send is performed when the mono completes.
Simply set the return type to void
.