Search code examples
python-multiprocessingzeromqpublish-subscribepyzmqpython-sockets

Communication between multiple processes with zmq



I am having n processes that have their own local data and actions and I want each process to send a "snapshot" of it's local data to the rest running nodes-processes.
The code that I have so far looks like that:
def node1():
    Process(target=sync_1).start()
    sleep(4)
    data = {'node': 1, 'data': 'node 1 data'}

    context_b = zmq.Context()
    socket_b = context_b.socket(zmq.PUB)

    connnected = False
    try:
        socket_b.bind("tcp://*:%s" % 5560)
        connnected = True
    except Exception as e:
        print(e)
    if connnected:
        topic = "101"
        try:
            socket_b.send_string(topic + ' ' + json.dumps(data))
        except Exception as e:
                print(e)
    socket_b.close()
    context_b.term()

def node2():
    Process(target=sync_2).start()

def sync_1():
    context_c = zmq.Context()
    socket_c = context_c.socket(zmq.SUB)
    _port = 5560
    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')

     try:
         raw = socket_c.recv().decode("utf-8")
         json0 = raw.find('{')
         topic = raw[0:json0].strip()
         msg = json.loads(raw[json0:])
         print("[SYNC 1] received {}-{}]".format(topic, msg))
     except Exception as e:
         print(e)

def sync_2():
    context_c = zmq.Context()
    socket_c = context_c.socket(zmq.SUB)
    _port = 5560

    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')

    try:
        raw = socket_c.recv().decode("utf-8")
        json0 = raw.find('{')
        topic = raw[0:json0].strip()
        msg = json.loads(raw[json0:])
        print("[SYNC 2] received {}-{}]".format(topic, msg))
    except Exception as e:
        print(e)

if __name__ == '__main__':
    Process(target=node1).start()
    Process(target=node2).start()

Each node has one "listener" process running at the background (the sync function) in order to receive each node data and use it accordingly, and it works fine when all the sub sockets are connected to one node (node 1 in that case) but I want each node to send data to all the listeners , so I am not sure how to implement that since the listener processes can connect to one port.

Also, the nodes will have to send the local data snapshot every time there is an update , so this cannot be an one time communication , therefore I thought of having listener processes actively waiting for updates all the times.

I believe a diagram could be useful for that problem:
enter image description here

There can be a way easier way to solve this issue , so any help would be highly appreciated!


Solution

  • Update: The solution was to use the XPUB-XSUB pattern.
    By using this pattern I created a proxy thread that allowed me to do exactly what I wanted.
    The most useful example I could find for Python is this .