I have a multithreaded application written in Java using jeromq 0.3.2. I'm working on putting it into the Tanuki service wrapper so that it runs as a windows service, and I'm having some trouble with cleanly stopping the threads. The run()
method of the proxy is a variation on the Simple Pirate pattern from the guide, and looks like this:
public void run()
{
ZContext context = new ZContext();
Socket frontend = context.createSocket(ZMQ.ROUTER);
Socket backend = context.createSocket(ZMQ.ROUTER);
frontend.bind("tcp://*:5555");
backend.bind("inproc://workers");
while (!Thread.currentThread().isInterrupted())
{
ZMQ.Poller poller = new ZMQ.Poller(2);
poller.register(frontend, ZMQ.Poller.POLLIN);
poller.register(backend, ZMQ.Poller.POLLIN);
poller.poll();
if (poller.pollin(0))
{
processFrontend(frontend, backend, context);
}
if (poller.pollin(1))
{
processBackend(frontend, backend);
}
}
// additonal code to close worker threads
}
How can I cleanly exit from this loop when the controlling wrapper wants to stop the application?
If there are no clients currently connected, then the loop is blocked at the call to poller.poll()
, so if the wrapper calls interrupt()
on the thread it is ignored. If there are clients currently sending in messages, then a call to interrupt()
causes the poller.poll()
method to throw a zmq.ZError$IOException: java.nio.channels.ClosedByInterruptException
I've also tried to use:
PollItem[] items = {
new PollItem(frontend, Poller.POLLIN),
new PollItem(backend, Poller.POLLIN)
};
int rc = ZMQ.poll(items, 2, -1);
if (rc == -1)
break;
if (items[0].isReadable())
{
processFrontend(frontend, backend, context);
}
if (items[1].isReadable())
{
processBackend(frontend, backend);
}
but the call to ZMQ.poll
exhibits the same behaviour. Two possible alternatives are:
ZMQ.poll
and wrap the content of the run()
method in a try/catch for the IOException. processFrontend
and cause the code to break out of the loop.The first seems a bit dirty, and the second feels a bit fragile. Are there any other approaches I should consider? Is there some other polling method I can use that reacts more cleanly to a thread interrupt?
It's better to extract the loop to the separate thread and open an additional inproc PULL
socket, so that the loop polls messages from 3 sockets: frontend
, backend
and control
. If the main thread receives interrupt signal, is should send STOP
command to the proxy loop.
Exactly this pattern is implemented in my project: https://github.com/thriftzmq/thriftzmq-java/blob/master/thriftzmq/src/main/java/org/thriftzmq/ProxyLoop.java
On the main thread you may add a shutdownHook
and send STOP
command to the loop from within it.
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
//Connect to controlSocket and send STOP command
ZMQ.Socket peer = context.socket(ZMQ.PUSH);
peer.connect(CONTROL_ENDPOINT);
peer.send(STOP_MESSAGE, 0);
peer.close();
} catch (Exception ex) {
logger.error("Failed to stop service", ex);
System.exit(1);
}
}
});
P.S. Using primitives from guava-library to manage service lifecycle makes the life a lot easier.