I need to create a circular sending of a single packet across machines. To do this, I use the PUSH-PULL pattern from ZMQ. But I'm faced with the problem of assigning ports and I can't figure out exactly how to work with them. Nodes do not connect to each other. Only the first one managed to make a connect to the right neighbor.
My code:
import zmq
from zmq import Socket
class ZmqFirstNode:
def __init__(self,
right_neighbour: str,
left_neighbour: str,
zmq_port: int,
host_ip: str):
self.right_neighbour = right_neighbour
self.left_neighbour = left_neighbour
self.zmq_port = zmq_port
self.host_ip = host_ip
def start_sending_messages(self, side: str):
if side == 'right':
recipient, sender = self.right_neighbour, self.left_neighbour
else:
recipient, sender = self.left_neighbour, self.right_neighbour
producer_socket, result_collector_socket = self.__create_sockets(recipient, sender)
self.__send_packet(producer_socket, recipient)
response = self.__receive_response(result_collector_socket)
print(response)
def __create_sockets(self, recipient: str, sender: str) -> tuple[Socket, Socket]:
context = zmq.Context()
# для отправки сообщения
producer_socket = context.socket(zmq.PUSH)
producer_socket.connect(f'tcp://{recipient}:{self.zmq_port}')
# для получения сообщения
result_collector_socket = context.socket(zmq.PULL)
result_collector_socket.bind(f'tcp://{sender}:{self.zmq_port + 1}')
return producer_socket, result_collector_socket
def __send_packet(self, socket: Socket, neighbour: str) -> None:
message = self.__create_packet(neighbour)
print(message)
socket.send_json(message)
@staticmethod
def __receive_response(socket: Socket):
response = socket.recv_json()
return response
def __create_packet(self, addressee: str) -> dict:
return {'from': self.host_ip, 'to': addressee, 'message': 'test'}
class ZmqConsumerNode:
def __init__(self,
right_neighbour: str,
left_neighbour: str,
zmq_port: int,
host_ip: str):
self.right_neighbour = right_neighbour
self.left_neighbour = left_neighbour
self.zmq_port = zmq_port
self.host_ip = host_ip
def start_sending_messages(self, side: str):
if side == 'right':
addressee, sender = self.right_neighbour, self.left_neighbour
elif side == 'left':
addressee, sender = self.left_neighbour, self.right_neighbour
else:
raise
consumer_receiver_socket, consumer_sender_socket = self.__create_sockets(addressee, sender)
packet = self.__receive_response(consumer_receiver_socket)
self.__send_packet(consumer_sender_socket, addressee, packet)
def __create_sockets(self, addressee: str, sender: str) -> tuple[Socket, Socket]:
context = zmq.Context()
# print(f"Connecting to {ip_addr}:{port}")
# для получения
consumer_receiver_socket = context.socket(zmq.PULL)
consumer_receiver_socket.bind(f'tcp://{sender}:{self.zmq_port}')
# для отправления
consumer_sender_socket = context.socket(zmq.PUSH)
try:
consumer_sender_socket.connect(f'tcp://{addressee}:{self.zmq_port}')
except zmq.error.ZMQError:
consumer_sender_socket.connect(f'tcp://{self.right_neighbour}:{self.zmq_port + 1}')
return consumer_receiver_socket, consumer_sender_socket
def __send_packet(self, socket: Socket, neighbour: str, packet) -> None:
self.__change_packet(neighbour, packet)
print(packet)
socket.send_json(packet)
@staticmethod
def __receive_response(socket: Socket):
response = socket.recv_json()
return response
def __create_packet(self, addressee: str) -> dict:
return {'from': self.host_ip, 'to': addressee, 'message': 'test'}
def __change_packet(self, addressee: str, packet: dict):
return packet.update({'from': self.host_ip, 'to': addressee})
I tried changing connection methods and ports using try/except, but it also didn't help, maybe I'm doing something wrong because I haven't figured it out thoroughly, but I've already read quite a lot of documentation :_)
Screenshots: First node All node
The problem was that the bind()
method does not need to specify the address where the message should come from, but where it will come from.