Search code examples
socketszeromqrecvjeromq

JeroMQ shutdown correctly


I am wondering how to shutdown JeroMQ properly, so far I know three methods that all have their pro and cons and I have no clue which one is the best.

The situation:

  • Thread A: owns context, shall provide start/stop methods
  • Thread B: actual listener thread

My current method:

Thread A

static ZContext CONTEXT = new ZContext();
Thread thread;

public void start() {
    thread = new Thread(new B()).start();
}

public void stop() {
    thread.stopping = true;
    thread.join();
}

Thread B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    socket.setReceiveTimeout(10);

    while (!stopping) {
        socket.recv();
    }

    if (NUM_SOCKETS >= 1) {
        CONTEXT.destroySocket(socket);
    } else {
        CONTEXT.destroy();
    }
}

This works just great. 10ms to shutdown is no problem for me, but I will unnecessarily increase the CPU load when there are no messages received. At the moment I prefer this one.


The second method shares the socket between the two threads:

Thread A

static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket;
Thread thread;

public void start() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    thread = new Thread(new B(socket)).start();
}

public void stop() {
    thread.stopping = true;
    CONTEXT.destroySocket(socket);
}

Thread B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    try {
        while (!stopping) {
            socket.recv();
        }
    } catch (ClosedSelection) {
        // socket closed by A
        socket = null;
    }
    if (socket != null) {
        // close socket myself
        if (NUM_SOCKETS >= 1) {
            CONTEXT.destroySocket(socket);
        } else {
            CONTEXT.destroy();
        }
    }
}

Works like a charm, too, but even if recv is already blocking the exception does not get thrown sometimes. If I wait one millisecond after I started thread A the exception is always thrown. I don't know if this is a bug or just an effect of my misuse, as I share the socket.


"revite" asked this question before (https://github.com/zeromq/jeromq/issues/116) and got an answer which is the third solution: https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/interrupt.java

Summary: They call ctx.term() and interrupt the thread blocking in socket.recv().

This works fine, but I do not want to terminate my whole context, but just this single socket. I would have to use one context per socket, so I were not able to use inproc.

Summary

At the moment I have no clue how to get thread B out of its blocking state other than using timeouts, share the socket or terminate the whole context.

What is the correct way of doing this?


Solution

  • It is often mentioned that you can just destroy the zmq context and anything sharing that context will exit, however this creates a nightmare because your exiting code has to do its best in avoiding a minefield of accidentally calling into dead socket objects.

    Attempting to close the socket doesn't work either because they are not thread safe and you'll end up with crashes.

    ANSWER: The best way is to do as the ZeroMQ guide suggests for any use via multiple threads; use zmq sockets and not thread mutexes/locks/etc. Set up an additional listener socket that you'll connect&send something to on shutdown, and your run() should used a JeroMQ Poller to check which of your two sockets receive anything - if the additional socket receives something then exit.