I am trying to adopt the ZeroMQ asynchronous client-server pattern described here with python multiprocessing. A brief description in the ZeroMQ guide
It's a DEALER/ROUTER
for the client to server frontend communication and DEALER/DEALER
for the server backend to the server workers communication. The server frontend and backend are connected using a zmq.proxy()
-instance.
Instead of using threads, I want to use multiprocessing
on the server. But requests from the client do not reach the server workers. However, they do reach the server frontend. And also the backend. But the backend is not able to connect to the server workers.
How do we generally debug these issues in pyzmq
?
How to turn on verbose logging for the sockets?
The python code snippets I am using -
server.py
import zmq
import time
from multiprocessing import Process
def run(context, worker_id):
socket = context.socket(zmq.DEALER)
socket.connect("ipc://backend.ipc")
print(f"Worker {worker_id} started")
try:
while True:
ident, msg = socket.recv_multipart()
print("Worker received %s from %s" % (msg, "ident"))
time.sleep(5)
socket.send_multipart([ident, msg])
print("Worker sent %s from %s" % (msg, ident))
except:
socket.close()
if __name__ == "__main__":
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5570")
backend = context.socket(zmq.DEALER)
backend.bind("ipc://backend.ipc")
N_WORKERS = 7
jobs = []
try:
for worker_id in range(N_WORKERS):
job = Process(target=run, args=(context, worker_id,))
jobs.append(job)
job.start()
zmq.proxy(frontend, backend)
for job in jobs:
job.join()
except:
frontend.close()
backend.close()
context.term()
client.py
import re
import zmq
from uuid import uuid4
if __name__ == "__main__":
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = uuid4()
socket.identity = identity.encode("ascii")
socket.connect("tcp://localhost:5570")
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
request = {
"body": "Some request body.",
}
socket.send_string(json.dumps(request))
while True:
for i in range(5):
sockets = dict(poll.poll(10))
if socket in sockets:
msg = socket.recv()
print(msg)
Q : "How to turn on verbose logging for the sockets?"
Start using the published native API socket_monitor()
for all relevant details, reported as events arriving from socket-(instance)-under-monitoring.
Q : "How do we generally debug these issues in
pyzmq
?"
There is no general strategy on doing this. Having gone into a domain of a distributed-computing, you will almost always create your own, project-specific, tools for "collecting" & "viewing/interpreting" a time-ordered flow of (principally) distributed-events.
Context()
-instance,The Art of Zen of Zero strongly advocates to avoid any shape and form of sharing. Here, the one and the very same Context()
-instance is referenced ("shared") via a multiprocessing.Process
's process-instantiation call-signature interface, which does not make the inter-process-"sharing" work.
One may let each spawned process-instance create it's own Context()
-instance and use it from inside its private space during its own life-cycle.
Btw, your code ignores any return-codes, documented in the native API, that help you handle ( in worse cases debug post-mortem ) what goes alongside the distributed-computing. The try: ... except: ... finally:
scaffolding also helps a lot here.
Anyway, the sooner you will learn to stop using the blocking-forms of the { .send() | .recv() | .poll() }
-methods, the better your code starts to re-use the actual powers of the ZeroMQ.