Search code examples
pythonrequestzeromqbrokerreply

ZMQ Python Request Reply Broker for Addressed Asynchronous Requests


I would like to utilize ZMQ to implement (in python) a broker and a client that handles request-reply to addressed entities asynchronously. The client contains functionality for doing both requests and replies (only thing missing is the exact socket-type/pattern).

The request can be blocking, but the reply-side needs to be able to handle parallel (threaded) requests as they come in. (ie REP-socket is not good enough since it requires a send before the next receive)

It needs to go through a broker since there will be many possible entities who can do requests and replies and I only want to bind a set number of ports (not one per entity).

Entity1                Broker                    Entity2
  REQ ------------- ROUTER ?????? -------------- ??????

Entity1 will know the ID of Entity2 and use that to make sure the request is made to Entity2 specifically. There can be any number of entities, but all entities that should answer to requests will register IDs.

I've tried with DEALER on the right side of the broker above, but that one will only send requests round-robin it seems.

So do anyone know a good pattern/set of sockets I could I use in order to address a specific entity asynchronously?

Summary:

  • Blocking on the request-side
  • Broker/Proxy for binding a fixed number of ports
  • The replying socket should be specifically addressed by the requester
  • Threaded replies (Reply-side can receive and handle parallel requests)

I've been reading the ZMQ-manual quite extensively but I haven't found any real good pattern yet for addressing specific sockets through a broker, so any help is greatly appreciated.


Solution

  • After some further research and testing I found a pattern that seems to provide a solution to all my demands.

    Pattern

    Requester               Broker                   Replier
      REQ ------------- ROUTER ROUTER -------------- DEALER
                    (requests) (replies)
    

    Requester

    The request side of the client simply connects to the request router on the broker, sends a request and starts reading the socket for a reply:

    reqSocket.connect(self._reqAddress)
    reqSocket.send_multipart([repId, message])
    reply = reqSocket.recv_multipart()[0]
    

    The replier ID is included as first part of the message, for example:

    Outgoing message: ['replierId', 'requestMsg']
    

    Request Router

    if self.reqRouterSocket in socketEvents:
        multipart = self.reqRouterSocket.recv_multipart()
        multipart = [multipart[-2]] + multipart
        del multipart[-2]
        self.repRouterSocket.send_multipart(multipart)
    

    Ie, the request router just moves the first part of the payload (being the replierId) and puts it first in the address stack:

    Incoming message: ['reqSocketAddr', '', 'replierId', 'requestMsg']
    Outgoing message: ['replierId', 'reqSocketAddr', '', 'requestMsg']
    

    The outgoing message is sent from the replier router. Since the replier has it's socket id set to 'replierId' and has connected to the replier router, that router recognizes this address and is able to deliver the request successfully.

    Replier

    The replier needs to set it's own socket identity to some known value in order to be directly addressed as described above.

    NOTE: You have to set the socket id of the DEALER socket BEFORE you perform the connect to the reply router. To set the identity of the socket:

    self.dealerSocket.setsockopt(zmq.IDENTITY, 'replierId')
    

    Else the router won't know the id and will throw the messages.

    The replier listens for incoming requests. In my case this is all threaded and requests are handled asynchronously. That is the reason for using the DEALER socket instead of a regular REP, which in the synchronous case would be much easier. The DEALER socket can receive further requests without having to answer the first one first, which the REP has to. A simplified version of what is done on the replier side however is:

    multipart = self.dealerSocket.recv_multipart()
    returnRoute = multipart[:-1]
    requestMsg = multipart[-1]
    reply = someFunction(requestMsg)
    self.dealerSocket.send_multipart(returnRoute + [reply])
    

    Ie, the replier just returns what it got, but with the request changed for a reply instead:

    Incoming message: ['replierId', 'reqSocketAddr', '', 'request']
    Outgoing message: ['replierId', 'reqSocketAddr', '', 'reply']
    

    This outgoing message is then sent back to the reply router.

    Reply Router

    A router is chosen on this side of the broker purely because of the fact that it needs functionality to address a specific socket among many connected ones.

    if self.repRouterSocket in socketEvents:
        multipart = self.repRouterSocket.recv_multipart()
        self.reqRouterSocket.send_multipart(multipart[1:])
    

    Ie, just pop the first address of the address stack and send the message over to the requesting side again.

    Incoming message: ['replierId', 'reqSocketAddr', '', 'reply']
    Outgoing message: ['reqSocketAddr', '', 'reply']
    

    The request router recognizes this address and sends the request back to the requester which receives:

    Incoming list: ['reply']
    

    This pattern seem to fulfill the requirements I made in my question. I hope it can be of use for others as well.