Search code examples
pythonnetwork-programmingzeromq

PUSH-PULL in ZMQ for other machine


I need to create a circular sending of a single packet across machines. To do this, I use the PUSH-PULL pattern from ZMQ. But I'm faced with the problem of assigning ports and I can't figure out exactly how to work with them. Nodes do not connect to each other. Only the first one managed to make a connect to the right neighbor.

My code:

import zmq
from zmq import Socket


class ZmqFirstNode:
    def __init__(self,
                 right_neighbour: str,
                 left_neighbour: str,
                 zmq_port: int,
                 host_ip: str):

        self.right_neighbour = right_neighbour
        self.left_neighbour = left_neighbour
        self.zmq_port = zmq_port
        self.host_ip = host_ip

    def start_sending_messages(self, side: str):
        if side == 'right':
            recipient, sender = self.right_neighbour, self.left_neighbour
        else:
            recipient, sender = self.left_neighbour, self.right_neighbour

        producer_socket, result_collector_socket = self.__create_sockets(recipient, sender)

        self.__send_packet(producer_socket, recipient)

        response = self.__receive_response(result_collector_socket)
        print(response)

    def __create_sockets(self, recipient: str, sender: str) -> tuple[Socket, Socket]:
        context = zmq.Context()

        # для отправки сообщения
        producer_socket = context.socket(zmq.PUSH)
        producer_socket.connect(f'tcp://{recipient}:{self.zmq_port}')

        # для получения сообщения
        result_collector_socket = context.socket(zmq.PULL)
        result_collector_socket.bind(f'tcp://{sender}:{self.zmq_port + 1}')

        return producer_socket, result_collector_socket

    def __send_packet(self, socket: Socket, neighbour: str) -> None:
        message = self.__create_packet(neighbour)
        print(message)
        socket.send_json(message)

    @staticmethod
    def __receive_response(socket: Socket):
        response = socket.recv_json()
        return response

    def __create_packet(self, addressee: str) -> dict:
        return {'from': self.host_ip, 'to': addressee, 'message': 'test'}


class ZmqConsumerNode:
    def __init__(self,
                 right_neighbour: str,
                 left_neighbour: str,
                 zmq_port: int,
                 host_ip: str):

        self.right_neighbour = right_neighbour
        self.left_neighbour = left_neighbour
        self.zmq_port = zmq_port
        self.host_ip = host_ip

    def start_sending_messages(self, side: str):
        if side == 'right':
            addressee, sender = self.right_neighbour, self.left_neighbour
        elif side == 'left':
            addressee, sender = self.left_neighbour, self.right_neighbour
        else:
            raise

        consumer_receiver_socket, consumer_sender_socket = self.__create_sockets(addressee, sender)

        packet = self.__receive_response(consumer_receiver_socket)
        self.__send_packet(consumer_sender_socket, addressee, packet)

    def __create_sockets(self, addressee: str, sender: str) -> tuple[Socket, Socket]:
        context = zmq.Context()

        # print(f"Connecting to {ip_addr}:{port}")
        # для получения
        consumer_receiver_socket = context.socket(zmq.PULL)
        consumer_receiver_socket.bind(f'tcp://{sender}:{self.zmq_port}')

        # для отправления
        consumer_sender_socket = context.socket(zmq.PUSH)
        try:
            consumer_sender_socket.connect(f'tcp://{addressee}:{self.zmq_port}')
        except zmq.error.ZMQError:
            consumer_sender_socket.connect(f'tcp://{self.right_neighbour}:{self.zmq_port + 1}')

        return consumer_receiver_socket, consumer_sender_socket

    def __send_packet(self, socket: Socket, neighbour: str, packet) -> None:
        self.__change_packet(neighbour, packet)
        print(packet)
        socket.send_json(packet)

    @staticmethod
    def __receive_response(socket: Socket):
        response = socket.recv_json()
        return response

    def __create_packet(self, addressee: str) -> dict:
        return {'from': self.host_ip, 'to': addressee, 'message': 'test'}

    def __change_packet(self, addressee: str, packet: dict):
        return packet.update({'from': self.host_ip, 'to': addressee})

I tried changing connection methods and ports using try/except, but it also didn't help, maybe I'm doing something wrong because I haven't figured it out thoroughly, but I've already read quite a lot of documentation :_)

Screenshots: First node All node


Solution

  • The problem was that the bind() method does not need to specify the address where the message should come from, but where it will come from.