def node1():
Process(target=sync_1).start()
sleep(4)
data = {'node': 1, 'data': 'node 1 data'}
context_b = zmq.Context()
socket_b = context_b.socket(zmq.PUB)
connnected = False
try:
socket_b.bind("tcp://*:%s" % 5560)
connnected = True
except Exception as e:
print(e)
if connnected:
topic = "101"
try:
socket_b.send_string(topic + ' ' + json.dumps(data))
except Exception as e:
print(e)
socket_b.close()
context_b.term()
def node2():
Process(target=sync_2).start()
def sync_1():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 1] received {}-{}]".format(topic, msg))
except Exception as e:
print(e)
def sync_2():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 2] received {}-{}]".format(topic, msg))
except Exception as e:
print(e)
if __name__ == '__main__':
Process(target=node1).start()
Process(target=node2).start()
Each node has one "listener" process running at the background (the sync function) in order to receive each node data and use it accordingly, and it works fine when all the sub sockets are connected to one node (node 1 in that case) but I want each node to send data to all the listeners , so I am not sure how to implement that since the listener processes can connect to one port.
Also, the nodes will have to send the local data snapshot every time there is an update , so this cannot be an one time communication , therefore I thought of having listener processes actively waiting for updates all the times.
I believe a diagram could be useful for that problem:
There can be a way easier way to solve this issue , so any help would be highly appreciated!
Update:
The solution was to use the XPUB-XSUB pattern.
By using this pattern I created a proxy thread that allowed me to do exactly what I wanted.
The most useful example I could find for Python is this .