Search code examples
python-3.xgoogle-cloud-platformpublish-subscribegoogle-cloud-pubsub

GCP message stays in the Pub/Sub after acknowledge


I have Pub/Sub subscribe logic wrapped inside a subscribe method that is being called once during service initialization for every subscription:

    def subscribe(self,
                  callback: typing.Callable,
                  subscription_name: str,
                  topic_name: str,
                  project_name: str = None) -> typing.Optional[SubscriberClient]:
        """Subscribes to Pub/Sub topic and return subscriber client

        :param callback: subscription callback method
        :param subscription_name: name of the subscription
        :param topic_name: name of the topic
        :param project_name: optional project name. Uses default project if not set
        :return: subscriber client or None if testing
        """
        project = project_name if project_name else self.pubsub_project_id
        self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))

        project_path = self.pubsub_subscriber.project_path(project)
        topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
        subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)

        # check if there is an existing subscription, if not, create it
        if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
            self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
            self.pubsub_subscriber.create_subscription(subscription_path, topic_path)

        # subscribe to the topic
        self.pubsub_subscriber.subscribe(
            subscription_path, callback=callback,
            scheduler=self.thread_scheduler
        )
        return self.pubsub_subscriber

This method is called like this:

        self.subscribe_client = self.subscribe(
            callback=self.pubsub_callback,
            subscription_name='subscription_topic',
            topic_name='topic'
        )

The callback method does a bunch of stuff, sends 2 emails then acknowledges the message

    def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
        self.logger.debug('Processing pub sub message')

        try:
            self.do_something_with_message(data)

            self.logger.debug('Acknowledging the message')
            data.ack()
            self.logger.debug('Acknowledged')
            return

        except:
            self.logger.warning({
                "message": "Failed to process Pub/Sub message",
                "request_size": data.size,
                "data": data.data
            }, exc_info=True)

        self.logger.debug('Acknowledging the message 2')
        data.ack()

When I run push something to the subscription, callback runs, prints all the debug messages including Acknowledged. The message, however, stays in the Pub/Sub, the callback gets called again and it takes exponential time after each retry. The question is what could cause the message to stay in the pub/sub even after the ack is called?

I have several such subscriptions, all of them work as expected. Deadline is not an option, the callback finishes almost immediately and I played with the ack deadline anyways, nothing helped.

When I try to process these messages from locally running app connected to that pub-sub, it completes just fine and acknowledge takes the message out of the queue as expected.

  • So the problem manifests only in deployed service (running inside a kubernetes pod)
  • Callback executes buck ack does seemingly nothing
  • Acking messages from a script running locally (...and doing the exact same stuff) or through the GCP UI works as expected.

Any ideas?


Solution

  • I did some additional testing and I finally found the problem.

    TL;DR: I was using the same google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler for all subscriptions.

    Here are the snippets of the code I used to test it. This is the broken version:

    server.py

    import concurrent.futures.thread
    import os
    import time
    
    from google.api_core.exceptions import AlreadyExists
    from google.cloud import pubsub_v1
    from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
    
    
    def create_subscription(project_id, topic_name, subscription_name):
        """Create a new pull subscription on the given topic."""
        subscriber = pubsub_v1.SubscriberClient()
        topic_path = subscriber.topic_path(project_id, topic_name)
        subscription_path = subscriber.subscription_path(
            project_id, subscription_name)
    
        subscription = subscriber.create_subscription(
            subscription_path, topic_path)
    
        print('Subscription created: {}'.format(subscription))
    
    
    def receive_messages(project_id, subscription_name, t_scheduler):
        """Receives messages from a pull subscription."""
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(
            project_id, subscription_name)
    
        def callback(message):
            print('Received message: {}'.format(message.data))
            message.ack()
    
        subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
        print('Listening for messages on {}'.format(subscription_path))
    
    
    project_id = os.getenv("PUBSUB_PROJECT_ID")
    
    publisher = pubsub_v1.PublisherClient()
    project_path = publisher.project_path(project_id)
    
    # Create both topics
    try:
        topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
        if 'topic_a' not in topics:
            publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
        if 'topic_b' not in topics:
            publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
    except AlreadyExists:
        print('Topics already exists')
    
    # Create subscriptions on both topics
    sub_client = pubsub_v1.SubscriberClient()
    project_path = sub_client.project_path(project_id)
    
    try:
        subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
        if 'topic_a_sub' not in subs:
            create_subscription(project_id, 'topic_a', 'topic_a_sub')
        if 'topic_b_sub' not in subs:
            create_subscription(project_id, 'topic_b', 'topic_b_sub')
    except AlreadyExists:
        print('Subscriptions already exists')
    
    scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))
    
    receive_messages(project_id, 'topic_a_sub', scheduler)
    receive_messages(project_id, 'topic_b_sub', scheduler)
    
    while True:
        time.sleep(60)
    

    client.py

    import datetime
    import os
    import random
    import sys
    from time import sleep
    
    from google.cloud import pubsub_v1
    
    
    def publish_messages(pid, topic_name):
        """Publishes multiple messages to a Pub/Sub topic."""
        publisher = pubsub_v1.PublisherClient()
        topic_path = publisher.topic_path(pid, topic_name)
    
        for n in range(1, 10):
            data = '[{} - {}] Message number {}'.format(datetime.datetime.now().isoformat(), topic_name, n)
            data = data.encode('utf-8')
            publisher.publish(topic_path, data=data)
            sleep(random.randint(10, 50) / 10.0)
    
    
    project_id = os.getenv("PUBSUB_PROJECT_ID")
    publish_messages(project_id, sys.argv[1])
    

    I connected to the cloud pub/sub, the server created topics and subscriptions. Then I ran the client script multiple times in parallel for both topics. After a short while, once I changed server code to instantiate new thread scheduler inside receive_messages scope, the server cleaned up both topics and functioned as expected.

    Confusing thing is that in either case, the server printed out the received message for all the messages.

    I am going to post this to https://github.com/googleapis/google-cloud-python/issues