Search code examples
springspring-integrationspring-amqpspring-websocket

Having trouble with contentType going from AMQP to STOMP message


Here is what I have (in summary)

<int-amqp:inbound-channel-adapter 
    auto-startup="false" 
    listener-container="clientListenerContainer" 
    channel="fromRabbitClientChannel" 
    connection-factory="rabbitConnectionFactory" />

<bean id="clientListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
</bean>

<int:service-activator ref="subscriptionHandler" method="convertMessage" input-channel="fromRabbitClientChannel" output-channel="sendMessage" />

<bean id="stompSubProtocolHandler" class="org.springframework.web.socket.messaging.StompSubProtocolHandler"/>

<int-websocket:outbound-channel-adapter channel="sendMessage" 
                                            container="serverWebSocketContainer"
                                            default-protocol-handler="stompSubProtocolHandler"/>

Here is my subscriptionHandler convertMessage function

public Message convertMessage(Message message){
    String subscriptionQuery = AmqpMessageHeaderAccessor.wrap(message).getReceivedRoutingKey();
    ClientSubscription subscription = this.subscriptions.getBySubscriptionQuery(subscriptionQuery);

    Message msg = MessageBuilder.withPayload(message.getPayload())
            .setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, subscription.getSessionId())
            .setHeader(SimpMessageHeaderAccessor.DESTINATION_HEADER, subscription.getSubscriptionQuery())
            .setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID_HEADER, subscription.getSubscriptionId()).build();
    return msg;
}

When messages come in through the <int-amqp:inbound-channel-adapter>, through the SimpleMessageListenerContainer, and to my '' the payload is a byte[] and the contentType header is set to application/octet-stream.

Then my convertMessage function creates a message from the payload with some STOMP headers and returns it to the sendMessage channel where the stompSubProtocolHandler takes over. It is there, when it tries to convert the headers to a stomp message, that is throws the exception java.lang.String cannot be cast to org.springframework.util.MimeType.

Here is the stack trace.

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.util.MimeType
    at org.springframework.messaging.support.MessageHeaderAccessor.getContentType(MessageHeaderAccessor.java:457)
    at org.springframework.messaging.simp.stomp.StompHeaderAccessor.updateStompHeadersFromSimpMessageHeaders(StompHeaderAccessor.java:169)
    at org.springframework.messaging.simp.stomp.StompHeaderAccessor.<init>(StompHeaderAccessor.java:127)
    at org.springframework.messaging.simp.stomp.StompHeaderAccessor.wrap(StompHeaderAccessor.java:478)
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.getStompHeaderAccessor(StompSubProtocolHandler.java:402)
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageToClient(StompSubProtocolHandler.java:329)
    at org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler.handleMessageInternal(WebSocketOutboundMessageHandler.java:151)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)

I feel like I am not adding a header that I need, or maybe I need to use a message converter... Or maybe I am just not doing it right all together.

What do I need to do to the message to prepare it for the stompSubProtocolHandler?

Am I handling messages from AMQP correctly by using an <int-amqp:inbound-channel-adapter> with a SimpleMessageListenerContainer? (I need the container to dynamically add more queues as more subscriptions are made)


Solution

  • Good task though!

    Would you mind to share it as a sample for the community when it will be ready? As PullRequest to this project: https://github.com/spring-projects/spring-integration-samples.

    Since your StackTrace says that the contentType is String, you can just override it in your subscriptionHandler:

    AmqpMessageHeaderAccessor amqpMessageHeaderAccessor = AmqpMessageHeaderAccessor.wrap(message);
    ....
    .setHeader(MessageHeaders.CONTENT_TYPE, amqpMessageHeaderAccessor.getContentType())
    

    UPDATE

    BTW, we recently fixed that contentType ambiguity: https://jira.spring.io/browse/SPR-12730. With the latest SF 4.1.5 you don't need to take care about contentType conversion anymore. It is done automatically in the framework.