Search code examples
pythonmessage-queuepublish-subscribepyzmq

pyzmq client hangs forever on receiving data


Here's simple client-server example on pub-sub pattern with zeromq, created by chatGPT v4. It's pretty self-explanatory and serve for pattern demonstraion purpose. The problem is that client hangs forver on while receiving data from message server.Code looks fine, but I am newcomer at zeromq so I can be wrong. Anyway, any suggestions how to fix this will be appreciated!

Server.py

# -*- coding: utf-8 -*-
import zmq
import threading
import json
import os
import time

# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5557")

# Save message info to a dictionary (a stand-in for your database)
messages = {}

# Send messages
consumers = ['consumer1', 'consumer2', ]

for i in range(10):
    message_id = str(i)
    file_path = f'./content/{i}'

    # Save message info to the database
    messages[message_id] = {
        'file_path': file_path,
        'consumers': consumers.copy(),  # copy the list because we're going to modify it
        'processed_by': [],
    }

    # Send the message to all consumers
    publisher.send_json({
        'message_id': message_id,
        'text': f'This is message {i}',
        'media_path': file_path,
    })


# Cleanup process
def cleanup():
    while True:
        for message_id, message in messages.items():
            #print(message.items())
            #print(message['processed_by'])
            if set(message['consumers']) == set(message['processed_by']):
                print(f"Deleting file {message['file_path']}")
                # os.remove(message['file_path'])  # uncomment this to actually delete the file
                del messages[message_id]

        time.sleep(5)  # pause between cleanup runs

cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()

# Receive acknowledgements
while True:
    # Wait for next request from client
    message = router.recv_json()
    print(f"Received request: {message}")

    # Process the acknowledgement
    if message['message_id'] in messages:
        messages[message['message_id']]['processed_by'].append(message['consumer'])

    #time.sleep(5)

Client.py

import zmq
import time

# Prepare context and sockets
context = zmq.Context()
consumer_id = 'consumer1'  # change this for each consumer

# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')

# Connect the dealer socket for sending acknowledgements
dealer = context.socket(zmq.DEALER)
dealer.identity = consumer_id.encode()
dealer.connect("tcp://localhost:5557")



# Process messages
while True:
    message = subscriber.recv_json()   
    print(f"Received message: {message}")
    #print(message)

    # Send acknowledgement
    dealer.send_json({
        'message_id': message['message_id'],
        'consumer': consumer_id,
    })

    time.sleep(5)  # pause between processing messages

Code seems to look right and must work properly byt for some reason it won't

Running with python 3.10.5 on Windows 11


Solution

  • The main problem with your code is that PUB/SUB sockets are lossy -- there is no synchronization between publishers and subscribers, and if a subscriber is not connected and subscribed when a message is sent it will never see that message.

    By the time your client has connected and finished negotiating with the server, all of the messages have already been sent.

    If you (a) start the subscriber first and (b) add a sleep to your publisher to give the subscriber a chance to connect, you'll see messages being received by the subscriber as expected.

    Then you'll encounter a second problem:

    When you receive a message on a ROUTER socket, that message is a multipart message consisting of the client id followed by the actual message data. So when you write:

    message = router.recv_json()
    

    This will blow up because you'll be receiving the client id, rather than the JSON message data, and this will fail with:

    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    A working version of the server might look like this:

    import json
    import zmq
    import threading
    import time
    
    
    # Cleanup process
    def cleanup():
        while True:
            for message_id, message in messages.items():
                # print(message.items())
                # print(message['processed_by'])
                if set(message["consumers"]) == set(message["processed_by"]):
                    print(f"Deleting file {message['file_path']}")
                    # os.remove(message['file_path'])  # uncomment this to actually delete the file
                    del messages[message_id]
    
            time.sleep(5)  # pause between cleanup runs
    
    
    # Connect the publisher socket
    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5556")
    
    # Connect the router socket for receiving acknowledgements
    router = context.socket(zmq.ROUTER)
    router.bind("tcp://*:5557")
    
    # Save message info to a dictionary (a stand-in for your database)
    messages = {}
    
    # Send messages
    consumers = [
        "consumer1",
        "consumer2",
    ]
    
    # give the subscriber a chance to connect
    time.sleep(2)
    
    for i in range(10):
        message_id = str(i)
        file_path = f"./content/{i}"
    
        # Save message info to the database
        messages[message_id] = {
            "file_path": file_path,
            "consumers": consumers.copy(),  # copy the list because we're going to modify it
            "processed_by": [],
        }
    
        # Send the message to all consumers
        publisher.send_json(
            {
                "message_id": message_id,
                "text": f"This is message {i}",
                "media_path": file_path,
            }
        )
    
    cleanup_thread = threading.Thread(target=cleanup)
    cleanup_thread.start()
    
    # Receive acknowledgements
    while True:
        # Wait for next request from client
        client, data = router.recv_multipart()
        message = json.loads(data)
        print(f"Received request: {message}")
    
        # Process the acknowledgement
        if message["message_id"] in messages:
            messages[message["message_id"]]["processed_by"].append(message["consumer"])
    
        # time.sleep(5)
    

    ...but this isn't great because of the issues with pub/sub sockets I mentioned earlier. If you're looking for some sort of synchronization between the client and the server, pub/sub sockets are the wrong solution.