Search code examples
pythonzeromqpyzmq

Does ZMQ have a mechanism to tell whether a connection is still alive?


I have a ZMQ server that performs a heavy computation and thus sending the result back to the client via server_socket.send() can take several minutes. The client should wait indefinitely for the result of the computation. However, if the connection drops during the computation, then the client should find another server to connect to.

I know this could be implemented manually by using a background thread on the server that keeps sending "please wait" messages to the client until the result is ready. This way, the client can use client_socket.RCVTIMEO = 1000 to raise zmq.Again if the server does not respond within 1 second.

However, I'm wondering whether there is a built-in mechanism in ZMQ for this, because it already uses background threads itself to send and receive messages. Is there a mechanism in ZMQ to tell whether a connection is still alive even though the server code has not called server_socket.send() in a while?

Here is the manual solution (which also only works for the case of a single client for now) that I would like to simplify:

import threading
import time

import zmq


def server():

  context = zmq.Context.instance()
  socket = context.socket(zmq.ROUTER)
  socket.bind('tcp://*:5555')

  while True:

    identity, _, message = socket.recv_multipart()
    print('Received request from client')

    print('Start telling the client to wait')
    waiting = True
    def say_wait():
      while waiting:
        socket.send_multipart([identity, b'', b'wait'])
        time.sleep(0.1)
        # TODO: This also needs to get a message from the same client, not any.
        _, _, message = socket.recv_multipart()
        assert message == b'alright', message

    thread = threading.Thread(target=say_wait)
    thread.start()

    print('Perform heavy server computation')
    time.sleep(3)

    print('Stop telling the client to wait')
    waiting = False
    thread.join()

    print('Send the result to the client')
    socket.send_multipart([identity, b'', b'result'])


def client():

  socket = None

  while True:

    if not socket:
      print('Client finds a new server to connect to')
      context = zmq.Context.instance()
      socket = context.socket(zmq.REQ)
      socket.RCVTIMEO = 1000  # 1 second timeout.
      address = find_available_server()
      socket.connect(f'tcp://{address}')

    socket.send(b'request')
    try:
      while True:
        message = socket.recv()
        if message == b'wait':
          print('Client heard that it should wait longer')
          socket.send(b'alright')
          continue
        else:
          print(f'Client got the result of the computation: {message}')
          break
    except zmq.Again:
      print('Client did not hear back from the server')
      socket.close(linger=0)
      socket = None


def find_available_server():
  # In practice, this function asks a central coordinator for
  # the IP address of an available server.
  return 'localhost:5555'


threading.Thread(target=server).start()
threading.Thread(target=client).start()

Solution

  • You need to look a the ZMQ_HEARTBEAT socket options, with socket monitor. Turn this option on, and the library will constantly ping the connection to and fro. If one end unexpectedly disappears, you get to learn about that at the surviving end via socket monitor.

    Certainly, this option exists in C/C++ bindings. I don't know for sure about the pyzmq library, but I'd be surprised if it didn't support it. It'd have to understand heart beat pings from other implementations; it has no option but to at least comply with the protocol standard. If so, why not expose the the functionality?