I have a ZMQ server that performs a heavy computation and thus sending the result back to the client via server_socket.send()
can take several minutes. The client should wait indefinitely for the result of the computation. However, if the connection drops during the computation, then the client should find another server to connect to.
I know this could be implemented manually by using a background thread on the server that keeps sending "please wait" messages to the client until the result is ready. This way, the client can use client_socket.RCVTIMEO = 1000
to raise zmq.Again
if the server does not respond within 1 second.
However, I'm wondering whether there is a built-in mechanism in ZMQ for this, because it already uses background threads itself to send and receive messages. Is there a mechanism in ZMQ to tell whether a connection is still alive even though the server code has not called server_socket.send()
in a while?
Here is the manual solution (which also only works for the case of a single client for now) that I would like to simplify:
import threading
import time
import zmq
def server():
context = zmq.Context.instance()
socket = context.socket(zmq.ROUTER)
socket.bind('tcp://*:5555')
while True:
identity, _, message = socket.recv_multipart()
print('Received request from client')
print('Start telling the client to wait')
waiting = True
def say_wait():
while waiting:
socket.send_multipart([identity, b'', b'wait'])
time.sleep(0.1)
# TODO: This also needs to get a message from the same client, not any.
_, _, message = socket.recv_multipart()
assert message == b'alright', message
thread = threading.Thread(target=say_wait)
thread.start()
print('Perform heavy server computation')
time.sleep(3)
print('Stop telling the client to wait')
waiting = False
thread.join()
print('Send the result to the client')
socket.send_multipart([identity, b'', b'result'])
def client():
socket = None
while True:
if not socket:
print('Client finds a new server to connect to')
context = zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.RCVTIMEO = 1000 # 1 second timeout.
address = find_available_server()
socket.connect(f'tcp://{address}')
socket.send(b'request')
try:
while True:
message = socket.recv()
if message == b'wait':
print('Client heard that it should wait longer')
socket.send(b'alright')
continue
else:
print(f'Client got the result of the computation: {message}')
break
except zmq.Again:
print('Client did not hear back from the server')
socket.close(linger=0)
socket = None
def find_available_server():
# In practice, this function asks a central coordinator for
# the IP address of an available server.
return 'localhost:5555'
threading.Thread(target=server).start()
threading.Thread(target=client).start()
You need to look a the ZMQ_HEARTBEAT socket options, with socket monitor. Turn this option on, and the library will constantly ping the connection to and fro. If one end unexpectedly disappears, you get to learn about that at the surviving end via socket monitor.
Certainly, this option exists in C/C++ bindings. I don't know for sure about the pyzmq library, but I'd be surprised if it didn't support it. It'd have to understand heart beat pings from other implementations; it has no option but to at least comply with the protocol standard. If so, why not expose the the functionality?