Search code examples
spring-integrationspring-integration-dslspring-integration-amqp

How can I pass an object `Message` to the route?


I create a flow, which consumes messages from RabbitMQ and after that distributes to the appropriate services by type using the router. Methods in services take argument Message<?>, because I need to use headers there. But in this method I receive only message payload with type java.lang.String instead org.springframework.messaging.Message and I get error java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message.

Payload isn't suitable for me, because I need to get headers from message.

@Bean
public IntegrationFlow testFlow(String queueName,
                                ConnectionFactory connectionFactory,
                                Service1 service1,
                                Service2 service2) {
    SimpleMessageListenerContainer consumerListener = new SimpleMessageListenerContainer(connectionFactory);
    consumerListener.addQueueNames(queueName);
    return IntegrationFlows.from(Amqp.inboundAdapter(consumerListener))
            .transform(s -> s, ConsumerEndpointSpec::transactional)
            .<Message<?>, String>route(HeadersUtil::getType, m -> m
                    .subFlowMapping(Type.SERVICE_1, sf -> sf.handle(service1::handleProcedure))
                    .subFlowMapping(Type.SERVICE_2, sf -> sf.handle(service2::handleProcedure)))
            .get();
}

The signature of the method handleProcedure is as follows:

void handleProcedure(Message<?> message)

I expect to get headers of Message in the method handleProcedure, but I get exception now.


Solution

  • I think you didn't understand the stack trace properly.

    Your void handleProcedure(Message<?> message) and its service1::handleProcedure method reference fully fits to the public B handle(MessageHandler messageHandler) { method signature in the IntegrationFlowDefinition.

    Your problem is here:

    .<Message<?>, String>route(HeadersUtil::getType, 
    

    Your HeadersUtil::getType expects a message, but the type for the lambda invocation is a payload which is String in your case.

    This should work:

    .<Message<?>, String>route(Message.class, HeadersUtil::getType,