Search code examples
pythongoogle-cloud-platformpublish-subscribegoogle-cloud-pubsub

How to Acknowledge a Google PubSub message using AckID in Python


I was going through the PubSub pull docs here

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0  # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
    streaming_pull_future.result(timeout=timeout)
except:  # noqa
    streaming_pull_future.cancel()

In the above example, message is ack-ed as soon as it is received. But I want to acknowledge only when my local celery workers finish processing the message so that PubSub can redeliver the message if the worker fails. So I take the ack_id of the message, and pass it onto the worker.

params["ack_id"] = message._ack_id
start_aggregation.delay(params)

I just can't figure out how I can use the ack_id in the worker to acknowledge the message. I know that you can use a pubsub end-point to ack a message like given here. But I can't figure out how I can use a service account credentials to do the same - they do it using OAuth in that doc. Any pointers are appreciated. Thanks.


Solution

  • Acking messages received from the client library with a direct call to the acknowledge API would cause issues in the client. The client has flow control limits, which determine the maximum number of messages that can be outstanding (delivered, but not acked). The removal of messages from the count occurs when one calls message.ack() or message.nack(). If you were to call the acknowledge API directly, then this count would not change, resulting in messages no longer flowing once the limit is reached.

    If you are trying to use celery to get more parallelism in your processing, you can probably do it directly without this intermediate step. One option is to start up instances of the subscriber client with the same subscription in different processes. The messages will be distributed among the subscribers. Alternatively, you could replace the scheduler with one that is process-based instead of thread-based, though that would be some more work.