Here is what I have (in summary)
<int-amqp:inbound-channel-adapter
auto-startup="false"
listener-container="clientListenerContainer"
channel="fromRabbitClientChannel"
connection-factory="rabbitConnectionFactory" />
<bean id="clientListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="rabbitConnectionFactory" />
</bean>
<int:service-activator ref="subscriptionHandler" method="convertMessage" input-channel="fromRabbitClientChannel" output-channel="sendMessage" />
<bean id="stompSubProtocolHandler" class="org.springframework.web.socket.messaging.StompSubProtocolHandler"/>
<int-websocket:outbound-channel-adapter channel="sendMessage"
container="serverWebSocketContainer"
default-protocol-handler="stompSubProtocolHandler"/>
Here is my subscriptionHandler convertMessage function
public Message convertMessage(Message message){
String subscriptionQuery = AmqpMessageHeaderAccessor.wrap(message).getReceivedRoutingKey();
ClientSubscription subscription = this.subscriptions.getBySubscriptionQuery(subscriptionQuery);
Message msg = MessageBuilder.withPayload(message.getPayload())
.setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, subscription.getSessionId())
.setHeader(SimpMessageHeaderAccessor.DESTINATION_HEADER, subscription.getSubscriptionQuery())
.setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID_HEADER, subscription.getSubscriptionId()).build();
return msg;
}
When messages come in through the <int-amqp:inbound-channel-adapter>
, through the SimpleMessageListenerContainer
, and to my '' the payload is a byte[] and the contentType header is set to application/octet-stream.
Then my convertMessage function creates a message from the payload with some STOMP headers and returns it to the sendMessage channel where the stompSubProtocolHandler takes over. It is there, when it tries to convert the headers to a stomp message, that is throws the exception java.lang.String cannot be cast to org.springframework.util.MimeType.
Here is the stack trace.
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.util.MimeType
at org.springframework.messaging.support.MessageHeaderAccessor.getContentType(MessageHeaderAccessor.java:457)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.updateStompHeadersFromSimpMessageHeaders(StompHeaderAccessor.java:169)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.<init>(StompHeaderAccessor.java:127)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.wrap(StompHeaderAccessor.java:478)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.getStompHeaderAccessor(StompSubProtocolHandler.java:402)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageToClient(StompSubProtocolHandler.java:329)
at org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler.handleMessageInternal(WebSocketOutboundMessageHandler.java:151)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
I feel like I am not adding a header that I need, or maybe I need to use a message converter... Or maybe I am just not doing it right all together.
What do I need to do to the message to prepare it for the stompSubProtocolHandler?
Am I handling messages from AMQP correctly by using an <int-amqp:inbound-channel-adapter>
with a SimpleMessageListenerContainer
? (I need the container to dynamically add more queues as more subscriptions are made)
Good task though!
Would you mind to share it as a sample for the community when it will be ready? As PullRequest to this project: https://github.com/spring-projects/spring-integration-samples.
Since your StackTrace says that the contentType
is String
, you can just override it in your subscriptionHandler
:
AmqpMessageHeaderAccessor amqpMessageHeaderAccessor = AmqpMessageHeaderAccessor.wrap(message);
....
.setHeader(MessageHeaders.CONTENT_TYPE, amqpMessageHeaderAccessor.getContentType())
UPDATE
BTW, we recently fixed that contentType
ambiguity: https://jira.spring.io/browse/SPR-12730. With the latest SF 4.1.5
you don't need to take care about contentType
conversion anymore. It is done automatically in the framework.