I create a flow, which consumes messages from RabbitMQ and after that distributes to the appropriate services by type using the router.
Methods in services take argument Message<?>
, because I need to use headers there. But in this method I receive only message payload with type java.lang.String
instead org.springframework.messaging.Message
and
I get error java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
.
Payload isn't suitable for me, because I need to get headers from message.
@Bean
public IntegrationFlow testFlow(String queueName,
ConnectionFactory connectionFactory,
Service1 service1,
Service2 service2) {
SimpleMessageListenerContainer consumerListener = new SimpleMessageListenerContainer(connectionFactory);
consumerListener.addQueueNames(queueName);
return IntegrationFlows.from(Amqp.inboundAdapter(consumerListener))
.transform(s -> s, ConsumerEndpointSpec::transactional)
.<Message<?>, String>route(HeadersUtil::getType, m -> m
.subFlowMapping(Type.SERVICE_1, sf -> sf.handle(service1::handleProcedure))
.subFlowMapping(Type.SERVICE_2, sf -> sf.handle(service2::handleProcedure)))
.get();
}
The signature of the method handleProcedure
is as follows:
void handleProcedure(Message<?> message)
I expect to get headers of Message
in the method handleProcedure
, but I get exception now.
I think you didn't understand the stack trace properly.
Your void handleProcedure(Message<?> message)
and its service1::handleProcedure
method reference fully fits to the public B handle(MessageHandler messageHandler) {
method signature in the IntegrationFlowDefinition
.
Your problem is here:
.<Message<?>, String>route(HeadersUtil::getType,
Your HeadersUtil::getType
expects a message, but the type for the lambda invocation is a payload
which is String
in your case.
This should work:
.<Message<?>, String>route(Message.class, HeadersUtil::getType,