I'm using simple qpid java client which consumes messages from broker and sends it to through SOAP service. On the producer side we put all data to map and then send this map to qpid. Here is a snippet:
QueueSender conBusQueueSender = (QueueSender) Component.getInstance("conBusQueueSender");
QueueSession queueSession = org.jboss.seam.jms.QueueSession.instance();
Map<String,Object> map = new HashMap<String,Object>();
map.put("applicationId", applicationId);
map.put("soapAction", "urn:changeApplicationStatus");
map.put("soapXML", changeApplicationStatusString);
MapMessage message = queueSession.createMapMessage();
message.setObject("map",map);
conBusQueueSender.send(message);
On the client side we receive message and trying to send it to web service through SOAP
while (true) {
MapMessage m = (MapMessage) consumer.receive();
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) m.getObject("map");
SOAPMessage message = createSoapMessage(map);
while (true) {
try {
SOAPMessage result = makeSoapCall(message, (String)map.get("applicationId"), (String)map.get("soapAction"));
if(result.getSOAPBody().hasFault()){
System.out.println("SOAPFault: "+result.getSOAPBody().getFault().getFaultString());
System.out.println("SOAPMessage tried to send: " + (String)map.get("soapXML"));
}
m.acknowledge();
// if we came here successfully -> break and process next message
break;
} catch (SOAPException e) {
// if we have exception -> sleep and retry
e.printStackTrace();
Thread.sleep(1000L);
continue;
}
}
}
Everything works fine but when consumer tries to send a little bigger message ~66KB it just prints this error:
73 [main] INFO org.apache.qpid.client.security.DynamicSaslRegistrar - Additional SASL providers successfully registered.
87 [main] INFO org.apache.qpid.client.AMQConnection - Connection:amqp://guest:********@localhost/?brokerlist='tcp://localhost:5672'
314 [main] INFO org.apache.qpid.client.protocol.AMQProtocolSession - Using ProtocolVersion for Session:0-10
330 [main] INFO org.apache.qpid.client.handler.ClientMethodDispatcherImpl - New Method Dispatcher:AMQProtocolSession[null]
341 [main] INFO org.apache.qpid.client.AMQConnection - Connecting with ProtocolHandler Version:0-10
414 [IoReceiver - localhost/127.0.0.1:5672] INFO org.apache.qpid.transport.ClientDelegate - The broker does not support the configured connection idle timeout of 120 sec, using the brokers max supported value of 0 sec instead.
420 [main] INFO org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-10
Connection established to amqp://guest:guest@localhost/?brokerlist='tcp://localhost:5672'
443 [main] INFO org.apache.qpid.client.AMQSession - Created session:org.apache.qpid.client.AMQSession_0_10@112c3327
Session created...
476 [main] INFO org.apache.qpid.client.AMQSession - Prefetching delayed existing messages will not flow until requested via receive*() or setML().
Consumer initialized...
481 [main] INFO org.apache.qpid.client.AMQSession.Dispatcher - Dispatcher-Channel-0 created
481 [Dispatcher-Channel-0] INFO org.apache.qpid.client.AMQSession.Dispatcher - Dispatcher-Channel-0 started
492 [Dispatcher-Channel-0] ERROR org.apache.qpid.client.BasicMessageConsumer - Caught exception (dump follows) - ignoring...
java.lang.IllegalArgumentException: unknown code: 105
at org.apache.qpid.transport.codec.AbstractDecoder.getType(AbstractDecoder.java:354)
at org.apache.qpid.transport.codec.AbstractDecoder.readMap(AbstractDecoder.java:287)
at org.apache.qpid.transport.codec.BBDecoder.readMap(BBDecoder.java:34)
at org.apache.qpid.transport.codec.AbstractDecoder.read(AbstractDecoder.java:455)
at org.apache.qpid.transport.codec.AbstractDecoder.readMap(AbstractDecoder.java:288)
at org.apache.qpid.transport.codec.BBDecoder.readMap(BBDecoder.java:34)
at org.apache.qpid.client.message.AMQPEncodedMapMessage.populateMapFromData(AMQPEncodedMapMessage.java:96)
at org.apache.qpid.client.message.JMSMapMessage.<init>(JMSMapMessage.java:71)
at org.apache.qpid.client.message.AMQPEncodedMapMessage.<init>(AMQPEncodedMapMessage.java:52)
at org.apache.qpid.client.message.AMQPEncodedMapMessageFactory.createMessage(AMQPEncodedMapMessageFactory.java:36)
at org.apache.qpid.client.message.AbstractJMSMessageFactory.create010MessageWithBody(AbstractJMSMessageFactory.java:135)
at org.apache.qpid.client.message.AbstractJMSMessageFactory.createMessage(AbstractJMSMessageFactory.java:166)
at org.apache.qpid.client.message.MessageFactoryRegistry.createMessage(MessageFactoryRegistry.java:150)
at org.apache.qpid.client.BasicMessageConsumer_0_10.createJMSMessageFromUnprocessedMessage(BasicMessageConsumer_0_10.java:221)
at org.apache.qpid.client.BasicMessageConsumer_0_10.createJMSMessageFromUnprocessedMessage(BasicMessageConsumer_0_10.java:47)
at org.apache.qpid.client.BasicMessageConsumer.notifyMessage(BasicMessageConsumer.java:693)
at org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:205)
at org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:47)
at org.apache.qpid.client.AMQSession$Dispatcher.notifyConsumer(AMQSession.java:3392)
at org.apache.qpid.client.AMQSession$Dispatcher.dispatchMessage(AMQSession.java:3336)
at org.apache.qpid.client.AMQSession$Dispatcher.access$900(AMQSession.java:3117)
at org.apache.qpid.client.AMQSession.dispatch(AMQSession.java:3110)
at org.apache.qpid.client.message.UnprocessedMessage.dispatch(UnprocessedMessage.java:55)
at org.apache.qpid.client.AMQSession$Dispatcher.run(AMQSession.java:3264)
at java.lang.Thread.run(Thread.java:680)
And there is no any errors like "Enqueue capacity threshold exceeded on queue "bus-status-queue"". What is wrong?
This is due to a restriction in AMPQ protocol related to String size. str16
is a 64K String size. The code part where this restriction is placed come into action when data-structures such as Maps and List are used to contain the Strings. With simple TextMessage, it is treated as raw data.
The solution to your question is, use byte[]
instead, which is defined as vbin32
which can contain 4GB data.