Search code examples
pythonsocketszeromqpollingpyzmq

PYZMQ REQ-REP Multiple Clients ZMQError with Polling


With a REQ-REP pattern, I am trying to request multiple clients with a timeout using poll, so that if the server detects that it is not able to receive a message from the first client, it will timeout and move on to the next client.

But it seems like after the initial timeout, it is unable to send the next message to the second client.

I am getting this error zmq.error.ZMQError: Operation cannot be accomplished in current state on this line socket.send_string("Sensor Data") in server.

Full output:

Connecting to machine...
Successfully connected to machine 127.0.0.1:9999
Successfully connected to machine 127.0.0.1:9998
Sending request  0 ...
Machine did not respond
Sending request  1 ...
Traceback (most recent call last):
  File "C:\Users\tobiw\Documents\Python\Raspberry Pi\zmq\REQrep\testSIMPLEpoll.py", line 16, in <module>
    socket.send_string("Sensor Data")
  File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 541, in send_string
    return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
  File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 384, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 771, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 249, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/socket.pyx", line 244, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
    raise ZMQError(errno)
zmq.error.ZMQError: Operation cannot be accomplished in current state
[Finished in 5.3s with exit code 1]

Server:

import zmq
import json

ports = ["127.0.0.1:9999", "127.0.0.1:9998"]

context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
    socket.connect("tcp://%s" % port)
    print("Successfully connected to machine %s" % port)

for request in range(len(ports)):
    print("Sending request ", request, "...")
    socket.send_string("Sensor Data")  # <-- error occurs here

    # use poll for timeouts:
    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)

    socks = dict(poller.poll(5 * 1000))

    if socket in socks:
        try:
            msg_json = socket.recv()
            sens = json.loads(msg_json)
            response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
            print("Received reply ", request, "[", response, "]")
        except IOError:
            print("Could not connect to machine")
    else:
        print("Machine did not respond")

Client:

import zmq
import time
import json

port = "9998" # multiple similar clients but just with different ports

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)

while True:
    #  Wait for next request from server
    message = str(socket.recv(), "utf-8")
    print("Received request: ", message)
    time.sleep(1)
    msgDict = {
        'sensor': "6",
        'data': "123456789",
        'client': "9876",
    }
    msg_json = json.dumps(msgDict)
    socket.send_string(msg_json)

If the server was able to receive a message from the first client, the second send to the second client will work fine, but if the server was not able to receive a message from the first client, then the error is reproduced.


Solution

  • Foremost, zmq.error.ZMQError: Operation cannot be accomplished in current state in a REQ-REP pattern is an indication that the order of send -> recv -> send -> recv is not in order. For my case, because of the poll on receive in the for-loop there was no final recv on the REQ server side as it timed out. When the method looped back, it went to send again which resulted in send -> recv -> send -> timeout -> send. A double send scenario which is illegal.

    What I did to rectify it: I switched from a REQ-REP pattern to a DEALER-REP pattern. This gives me an asynchronous server that can talk to multiple REP clients.

    With client staying the same, this is the new server for those who are interested:

    Server:

    import zmq
    import json
    
    ports = ["127.0.0.1:9999", "127.0.0.1:9998"]
    
    context = zmq.Context()
    print("Connecting to machine...")
    socket = context.socket(zmq.DEALER)
    socket.setsockopt(zmq.LINGER, 0)
    for port in ports:
        socket.connect("tcp://%s" % port)
        print("Successfully connected to machine %s" % port)
    
    for request in range(len(ports)):
        print("Sending request ", request, "...")
        socket.send_string("", zmq.SNDMORE)  # delimiter
        socket.send_string("Sensor Data")  # actual message
    
        # use poll for timeouts:
        poller = zmq.Poller()
        poller.register(socket, zmq.POLLIN)
    
        socks = dict(poller.poll(5 * 1000))
    
        if socket in socks:
            try:
                socket.recv()  # discard delimiter
                msg_json = socket.recv()  # actual message
                sens = json.loads(msg_json)
                response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
                print("Received reply ", request, "[", response, "]")
            except IOError:
                print("Could not connect to machine")
        else:
            print("Machine did not respond")