Search code examples
javaspringspring-integrationspring-java-config

LambdaMessageProcessor doesn't recognize payload type for ConversionService


I am using Spring Integration 4.2.5.RELEASE and Spring Integration Java DSL 1.1.2.RELEASE.

I am having trouble getting a custom conversion to work. I have registered the custom converter to convert from byte[] to my.object.MyClass

@Bean
@IntegrationConverter
public Converter bytesToMyClass() {
    return new Converter<byte[], my.object.MyClass>() {
        @Override
        public my.object.MyClass convert(byte[] source) {

            try {
                return my.object.MyClass.newBuilder().mergeFrom(source).build();
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Could not convert message.", e);
            }
        }
    };
}

Then I set up my integration flow. The goal is to route messages down one of two paths based on a variable called insertBuffer. If insertBuffer is > 1, then aggregate messages. Otherwise, just wrap the single message in a collection and send it to the service method. Here is my flow:

@Bean
public IntegrationFlow routeInput( MessageChannel input, MyClassService service ) {

    return IntegrationFlows.from(input)
            .<my.object.MyClass, Boolean>route((my.object.MyClass payload) -> insertBuffer > 1, mapping -> mapping
                .subFlowMapping("true", aggregateflow -> aggregateflow
                    .<my.object.MyClass, Collection<my.object.MyClass>>aggregate(a -> a
                            .correlationStrategy(message -> 0) //all messages are part of the same group for now.
                            .releaseStrategy(group -> group.size() >= insertBuffer)
                            .sendPartialResultOnExpiry(true)
                            .expireGroupsUponCompletion(true)
                            .expireGroupsUponTimeout(true)
                            .groupTimeout(2000)))
                .subFlowMapping("false", single -> single
                    .<my.object.MyClass, Collection<my.object.MyClass>>transform(Arrays::asList)
                ))
            .handle(Collection.class, (payload, headers) ->
                    service.saveResult(payload))
            .get();
}

However, when I try running this, I get the following Exception java.lang.ClassCastException: [B cannot be cast to my.object.MyClass (full stack below).

After some debugging, I notice that when org.springframework.integration.dsl.LambdaMessageProcessor#processMessage is trying to process the message, the payloadType is java.lang.Object when I think it should be my.object.MyClass.

It seems like I have all of my generics correct, what am I missing?

Full stacktrace:

ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
    at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
    at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelKeys(AbstractMessageProcessingRouter.java:80)
    at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:148)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:154)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:103)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:57)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:176)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:173)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:330)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:127)
    ... 37 more

Solution

  • The problem with Lambda that it can't determine generic type at runtime: Java: how to resolve generic type of lambda parameter?

    That's why we have there an overloaded method on the matter:

     .<my.object.MyClass, Boolean>route(my.object.MyClass.class, payload -> insertBuffer > 1, mapping -> mapping