Search code examples
apache-cameljmsactivemq-classicjbossfusefuseesb

How to change properties of AMQ endpoint?


When I create ActiveMQ queue through the console, it already has pre-defined property values.

For example:

Max message size    3301921
Min message size    1091

Similarly, when I create a queue in the client code:

...
ConnectionFactory connFactory = 
    new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
ActiveMQSession session = 
    (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer msgProducer = session.createProducer(session.createQueue(queueName));
...

I have a simple Camel route that gets the serialized files from the queue and sends it to some external resource:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("activemq:alfresco-queue")
        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
                multipartEntityBuilder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
                multipartEntityBuilder.addPart("file", new ByteArrayBody(exchange.getIn().getBody(byte[].class), 
                        exchange.getIn().getHeader("fileName", String.class)));
                exchange.getIn().setBody(multipartEntityBuilder.build().getContent());              
            }
        })

        .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.POST))
        .to("http4://vm-alfce5-31.....com:8080/alfresco/s/someco/queuefileuploader?guest=true")

        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                System.out.println("The response code is: " + 
                        exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE));
            }
        });
    }
}

My OSGi bundle runs in JBoss FUSE ESB.

When I send a file with size that exceeds the pre-defined maximum size, I get the following error:

Failed delivery for (MessageId: ID:63-DP-TAV-51262-1531978204588-1:1:1:1:1 
on ExchangeId: ID-63-DP-TAV-64708-1531973651576-0-3). 
Exhausted after delivery attempt: 
1 caught: org.apache.http.ContentTooLongException: 
Content length is too long: 3301060

The stacktrace is shown below:

org.apache.http.ContentTooLongException: Content length is too long: 3301060
  at org.apache.http.entity.mime.MultipartFormEntity.getContent(MultipartFormEntity.java:103)[commons-codec:commons-codec:1.9 org.apache.httpcomponents:fluent-hc:4.5.2 org.apache.httpcomponents:httpclient-cache:4.5.2 org.apache.httpcomponents:httpclient-osgi:4.5.2 org.apache.httpcomponents:httpclient:4.5.2 org.apache.httpcomponents:httpmime:4.5.2]
  at org.fusesource.example.MyRouteBuilder$2.process(MyRouteBuilder.java:37)[org.fusesource.example:camel-basic:1.0-SNAPSHOT]
  at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:468)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:91)[org.apache.camel:camel-core:2.17.0.redhat-630187 com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2]
  at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)[org.apache.camel:camel-jms:2.17.0.redhat-630187]
  at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:555)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:515)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:485)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)[org.apache.servicemix.bundles:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
  at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]

How to change properties of the ActiveMQ queue?


Solution

  • That error is from the http endpoint, not the ActiveMQ queue.

    I don't recommend changing the max message size on the broker for one message type.

    Instead, I recommend:

    1. Update the max size on the http service
    2. Add a check for max size of payload off the queue and handle accordingly