I'm new to Apache ActiveMQ and have some trouble with sending and storing big amount of big messages (biggest ones are around 100mb) in ActiveMQ. Messages are persistent, so as I know they store at the hard drive, not in memory. But strangely the ActiveMQ crashes when the database size (KahaDB folder) reaches 2.8Gb (70% of 4Gb jvm heap). Seems like ActiveMQ still store all messages in memory. I need firstly to store all messages, then I will be consuming them all.
My config activemq.xml
<policyEntry queue=">" producerFlowControl="false" memoryLimit="100MB">
<pendingQueuePolicy>
<fileQueueCursor />
</pendingQueuePolicy>
</policyEntry>
...
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
...
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=204857600"/>
</transportConnectors>
And that how I send messages in java. Try-with-resources closes all.
try (Connection conn = queueConnFactory.createConnection();
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(getTargetQueue())) {
ObjectMessage message = session.createObjectMessage(transferMessage);
producer.send(message);
}
And this is activeMQ log when it crashes last time.
ERROR | Forwarding of acks failed | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
java.lang.OutOfMemoryError: Java heap space
2019-08-02 20:32:12,421 | INFO | Ignoring no space left exception, java.io.IOException: Java heap space | org.apache.activemq.util.DefaultIOExceptionHandler | ActiveMQ Journal Checkpoint Worker
java.io.IOException: Java heap space
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40)[activemq-client-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase$AckCompactionRunner.run(MessageDatabase.java:2075)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.8.0_211]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
2019-08-02 20:37:12,514 | WARN | Async error occurred: java.lang.OutOfMemoryError: Java heap space | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///10.11.34.224:58668@61616
2019-08-02 20:37:44,899 | ERROR | Forwarding of acks failed | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
java.lang.IllegalArgumentException: Self-suppression not permitted
at java.lang.Throwable.addSuppressed(Unknown Source)[:1.8.0_211]
at org.apache.activemq.store.kahadb.MessageDatabase.forwardAllAcks(MessageDatabase.java:2132)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase.access$700(MessageDatabase.java:121)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase$AckCompactionRunner.run(MessageDatabase.java:2068)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.8.0_211]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
2019-08-02 20:37:44,899 | INFO | Stopping BrokerService[localhost] due to exception, java.io.IOException: Self-suppression not permitted | org.apache.activemq.util.DefaultIOExceptionHandler | ActiveMQ Journal Checkpoint Worker
java.io.IOException: Self-suppression not permitted
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40)[activemq-client-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase$AckCompactionRunner.run(MessageDatabase.java:2075)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.8.0_211]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
2019-08-02 20:37:44,946 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:1) is shutting down | org.apache.activemq.broker.BrokerService | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:45,133 | INFO | Connector openwire stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:45,180 | INFO | Connector amqp stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:46,974 | INFO | Connector stomp stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:46,974 | INFO | Connector mqtt stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:48,316 | INFO | Connector ws stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:48,955 | INFO | PListStore:[C:\ActiveMQ\bin\win64\..\..\data\localhost\tmp_storage] stopped | org.apache.activemq.store.kahadb.plist.PListStoreImpl | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:49,002 | INFO | Stopping async queue tasks | org.apache.activemq.store.kahadb.KahaDBStore | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:49,002 | INFO | Stopping async topic tasks | org.apache.activemq.store.kahadb.KahaDBStore | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:49,002 | INFO | Stopped KahaDB | org.apache.activemq.store.kahadb.KahaDBStore | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:51,404 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:1) uptime 1 hour 32 minutes | org.apache.activemq.broker.BrokerService | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:58,502 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:1) is shutdown | org.apache.activemq.broker.BrokerService | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:58,534 | INFO | Closing org.apache.activemq.xbean.XBeanBrokerFactory$1@14d57a4: startup date [Fri Aug 02 19:05:26 MSK 2019]; root of context hierarchy | org.apache.activemq.xbean.XBeanBrokerFactory$1 | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:38:00,296 | INFO | Destroying Spring FrameworkServlet 'dispatcher' | /admin | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:38:00,764 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@71d9564d: startup date [Fri Aug 02 20:38:00 MSK 2019]; root of context hierarchy | org.apache.activemq.xbean.XBeanBrokerFactory$1 | WrapperSimpleAppMain
2019-08-02 20:38:03,619 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\ActiveMQ\bin\win64\..\..\data\kahadb] | org.apache.activemq.broker.BrokerService | WrapperSimpleAppMain
2019-08-02 20:38:04,196 | INFO | ignoring zero length, partially initialised journal data file: db-404.log number = 404 , length = 0 | org.apache.activemq.store.kahadb.disk.journal.Journal | WrapperSimpleAppMain
2019-08-02 20:38:05,819 | INFO | KahaDB is version 6 | org.apache.activemq.store.kahadb.MessageDatabase | WrapperSimpleAppMain
2019-08-02 20:38:06,177 | INFO | PListStore:[C:\ActiveMQ\bin\win64\..\..\data\localhost\tmp_storage] started | org.apache.activemq.store.kahadb.plist.PListStoreImpl | WrapperSimpleAppMain
2019-08-02 20:38:06,177 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:2) is starting | org.apache.activemq.broker.BrokerService | WrapperSimpleAppMain
2019-08-02 20:38:10,374 | INFO | PListStore:[C:\ActiveMQ\bin\win64\..\..\data\localhost\tmp_storage] initialized | org.apache.activemq.store.kahadb.plist.PListStoreImpl | WrapperSimpleAppMain
2019-08-02 20:38:26,925 | WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.lang.OutOfMemoryError: Java heap space | org.apache.activemq.xbean.XBeanBrokerFactory$1 | WrapperSimpleAppMain
2019-08-02 20:38:26,925 | ERROR | Failed to load: class path resource [activemq.xml], reason: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.lang.OutOfMemoryError: Java heap space | org.apache.activemq.xbean.XBeanBrokerFactory | WrapperSimpleAppMain
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.lang.OutOfMemoryError: Java heap space
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1634)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)[xbean-spring-4.2.jar:4.2]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)[xbean-spring-4.2.jar:4.2]
at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)[activemq-spring-5.15.9.jar:5.15.9]
at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)[activemq-spring-5.15.9.jar:5.15.9]
at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)[activemq-spring-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)[activemq-console-5.15.9.jar:5.15.9]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_211]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at java.lang.reflect.Method.invoke(Unknown Source)[:1.8.0_211]
at org.apache.activemq.console.Main.runTaskClass(Main.java:262)[activemq.jar:5.15.9]
at org.apache.activemq.console.Main.main(Main.java:115)[activemq.jar:5.15.9]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_211]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at java.lang.reflect.Method.invoke(Unknown Source)[:1.8.0_211]
at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)[wrapper.jar:3.2.3]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
At this point I don't see any evidence of a memory leak. Just because the broker ran out of memory doesn't mean there is a leak. You could simply not be giving the JVM enough heap for your use-case.
Also, just because the broker uses a database of sorts (i.e. KahaDB) to store the messages to disk doesn't mean the broker is much like a database. The broker will still store as much message data in memory as it can despite the fact that messages are persisted to disk contrary to your assertion. It would be extremely inefficient to purge message data from memory once it's written to disk then immediately re-load it from disk as soon as a consumer needs it. There is also the matter of non-persistent messages that the broker must track without writing them to disk.
If you're just going to be storing all the data and then retrieving it later it might be best for you to simply use a database as a database would be better suited to such a use-case. A message broker is usually better suited to use cases where consumers are active with producers concurrently and messages don't build up on the broker but are dispatched to consumers soon after they arrive.
Your code snippet is potentially an anti-pattern. JMS connections are thread-safe and are meant to be re-used. Opening and closing a connection for every message sent is an anti-pattern if, in fact, the application does any other JMS work aside from sending a single, solitary message.
Lastly, you may consider moving to ActiveMQ Artemis (i.e. the next generation of ActiveMQ) as it has direct support for "large" messages without requiring the use of an out-of-band transfer mechanism or implementation objects as required with the "blob message" support in ActiveMQ "Classic."