Search code examples
javamultithreadingrabbitmqthreadpoolout-of-memory

RabbitMQ java.lang.OutOfMemoryError


Firstly I apologise if this question has been handled elsewhere, I just haven't found something that solved my particular problem.

I have a gateway server which receives msgs externally. It puts it in a queue that my Order processing server is listening on. My order processing server listens on 2 queues (in a thread). Queue 1 - gateway server, Queue 2 - clearing server.

So in my order processor, I have worker threads. I'm using ExecutorService to manage my threads. The problem is in the worker thread.

In the worker thread I make two instance of MQs that I use to publish message to either the clearing server or the gateway server. I basically need to do some processing and then publish that message to these queues.

What I want to know is, should I close the channel and connection in my worker thread each time I finish dealing with a message?

If I don't close the MQ connections on every worker thread after processing a message then after processing 8-900 messages, I start getting following exception intermittently :

java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Unknown Source)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:307)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
    at interfaces.MQ.<init>(MQ.java:41)
    at orderProcessor.ProcessOrders.<init>(ProcessOrders.java:109)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:52)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.routeIncoming(ProcessIncomingCSThread.java:45)
    at interfaces.ProcessIncomingThread.run(ProcessIncomingThread.java:47)

If I do close the connections on every worker thread after processing a msg then after a awhile I get following exception intermittently :]

Exception in thread "AMQP Connection 127.0.0.1:5672" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Unknown Source)
    at com.rabbitmq.client.impl.ChannelManager.scheduleShutdownProcessing(ChannelManager.java:108)
    at com.rabbitmq.client.impl.ChannelManager.handleSignal(ChannelManager.java:94)
    at com.rabbitmq.client.impl.AMQConnection.finishShutdown(AMQConnection.java:696)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:669)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)

I am using basicQos when I create a MQ connection for consumption to keep RabbitMQ's internal queue reasonable.

I create my MQ connections for consumption in following manner :

_channel.queueDeclare(this._mqName.toString(), true, false, false, null);
_channel.basicConsume(this._mqName.toString(), true, _consumer);
_channel.basicQos(50);

Thanks for looking at this and any suggestions or help would be much appreciated. it is more than likely that I'm not doing things correctly for my context..


Solution

  • Thanks for the input guys. I resolved the issue.

    I was creating connection in every worker thread. Now I create the connection on the main thread and pass that down to worker threads who create their channels from that connection. this seems to work a treat.

    This however means, I'll have to re-design my MQ class to handle this workflow.