Search code examples
google-cloud-platformgoogle-cloud-pubsubgoogle-cloud-python

Google PubSub Python multiple subscriber clients receiving duplicate messages


I have a pretty straightforward app that starts a PubSub subscriber StreamingPull client. I have this deployed on Kubernetes so I can scale. When I have a single pod deployed, everything works as expected. When I scale to 2 containers, I start getting duplicate messages. I know that some small of duplicate messages is to be expected, but almost half the messages, sometimes more, are received multiple times.

My process takes about 600ms to process a message. The subscription acknowledgement deadline is set to 600s. I published 1000 messages, and the subscription was emptied in less than a minute, but the acknowledge_message_operation metric shows ~1500 calls, with a small amount with response_code expired. There were no failures in my process and all messages were acked upon processing. Logs show that the same message was received by the two containers at the exact same time. The minute to process all the messages was well below the acknowledgement deadline of the subscription, and the Python client is supposed to handle lease management, so I'm not sure why there were any expired messages at all. I also don't understand why the same message is sent to multiple subscriber clients at the same time.

Minimal working example:

import time

from google.cloud import pubsub_v1

PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'

def subscribe(sleep_time=None):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)

    def callback(message):
        print(message.data.decode())
        if sleep_time:
            time.sleep(sleep_time)
        print(f'acking {message.data.decode()}')
        message.ack()

    future = subscriber.subscribe(
        subscription_path, callback=callback)
    print(f'Listening for messages on {subscription_path}')
    future.result()


def publish(num_messages):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
    for i in range(num_messages):
        publisher.publish(topic_path, str(i).encode())

In two terminals, run subscribe(1). In a third terminal, run publish(200). For me, this will give duplicates in the two subscriber terminals.


Solution

  • It is unusual for two subscribers to get the same message at the same time unless:

    1. The message got published twice due to a retry (and therefore as far as Cloud Pub/Sub is concerned, there are two messages). In this case, the content of the two messages would be the same, but their message IDs would be different. Therefore, it might be worth ensuring that you are looking at the service-provided message ID to ensure the messages are indeed duplicates.
    2. The subscribers are on different subscriptions, which means each of the subscribers would receive all of the messages.

    If neither of these is the case, then duplicates should be relatively rare. There is an edge case in dealing with large backlogs of small messages with streaming pull (which is what the Python client library uses). Basically, if messages that are very small are published in a burst and subscribers then consume that burst, it is possible to see the behavior you are seeing. All of the messages would end up being sent to one of the two subscribers and would be buffered behind the flow control limits of the number of outstanding messages. These messages may exceed their ack deadline, resulting in redelivery, likely to the other subscriber. The first subscriber still has these messages in its buffer and will see these messages, too.

    However, if you are consistently seeing two subscribers freshly started immediately receive the same messages with the same message IDs, then you should contact Google Cloud support with your project name, subscription name, and a sample of the message IDs. They will better be able to investigate why this immediate duplication is happening.