Firstly I apologise if this question has been handled elsewhere, I just haven't found something that solved my particular problem.
I have a gateway server which receives msgs externally. It puts it in a queue that my Order processing server is listening on. My order processing server listens on 2 queues (in a thread). Queue 1 - gateway server, Queue 2 - clearing server.
So in my order processor, I have worker threads. I'm using ExecutorService to manage my threads. The problem is in the worker thread.
In the worker thread I make two instance of MQs that I use to publish message to either the clearing server or the gateway server. I basically need to do some processing and then publish that message to these queues.
What I want to know is, should I close the channel and connection in my worker thread each time I finish dealing with a message?
If I don't close the MQ connections on every worker thread after processing a message then after processing 8-900 messages, I start getting following exception intermittently :
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:307)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
at interfaces.MQ.<init>(MQ.java:41)
at orderProcessor.ProcessOrders.<init>(ProcessOrders.java:109)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:52)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.routeIncoming(ProcessIncomingCSThread.java:45)
at interfaces.ProcessIncomingThread.run(ProcessIncomingThread.java:47)
If I do close the connections on every worker thread after processing a msg then after a awhile I get following exception intermittently :]
Exception in thread "AMQP Connection 127.0.0.1:5672" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at com.rabbitmq.client.impl.ChannelManager.scheduleShutdownProcessing(ChannelManager.java:108)
at com.rabbitmq.client.impl.ChannelManager.handleSignal(ChannelManager.java:94)
at com.rabbitmq.client.impl.AMQConnection.finishShutdown(AMQConnection.java:696)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:669)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
I am using basicQos
when I create a MQ connection for consumption to keep RabbitMQ's internal queue reasonable.
I create my MQ connections for consumption in following manner :
_channel.queueDeclare(this._mqName.toString(), true, false, false, null);
_channel.basicConsume(this._mqName.toString(), true, _consumer);
_channel.basicQos(50);
Thanks for looking at this and any suggestions or help would be much appreciated. it is more than likely that I'm not doing things correctly for my context..
Thanks for the input guys. I resolved the issue.
I was creating connection in every worker thread. Now I create the connection on the main thread and pass that down to worker threads who create their channels from that connection. this seems to work a treat.
This however means, I'll have to re-design my MQ class to handle this workflow.