Search code examples
javaspring-integration

Chain handlers returning Mono


I am trying to make a flow that starts from a WebFlux.inboundChannelAdapter, use a first handler that returns a Mono, and then a second one that is imperative. Eventually I would like the REST call to return only once the flow has completed.

From what I understood from the Reactive Streams Support documentation I should not have to modify my imperative handler to take a Mono as parameter. Unfortunately I keep facing the exception:

org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@7f08caf] (simple.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String, failedMessage=GenericMessage [payload=MonoJust, headers={id=13de5ed2-0100-9b5c-323b-4d213a05a582, timestamp=1689592273907}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:87)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:36)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:295)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:276)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:87)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:36)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:295)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:276)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$6(FluxMessageChannel.java:147)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:105)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:105)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    ... 54 more

Spring integration version: 5.5.18 Java: 1.8

I reproduce the issue with a small demo application:

@Bean
IntegrationFlow flow() {

    return IntegrationFlows
            .from(WebFlux.inboundChannelAdapter("/start")
                    .requestMapping(m -> m.methods(HttpMethod.GET))
            )
            .handle((p,h) -> Mono.just("foo")) //In reality the is a WebClient involved here
            .handle(toUpperCase())
            .get();
}

GenericHandler<String> toUpperCase() {
    return (p,h) -> p.toUpperCase();
}

I have tried a combination of .channel(c -> c.flux()) and .reactive() on handler's ConsumerEndpointSpec but without success.

Could you please help me to figure out a solution ?


Solution

  • So, you do this:

     .handle((p,h) -> Mono.just("foo"))
     .handle(toUpperCase())
    

    The Framework doesn't know (and really doesn't care) that you return a Mono from the first handler. The second handler expects from us a String as an input payload type, but your one is a Mono.

    For that old version I would suggest to fix it like this:

    .handle((p,h) -> Mono.just("foo"), e -> e.async(true)).
    

    It covers both Future and Reactive Streams Publisher reply message handling.