Search code examples
google-cloud-platformpublish-subscribegoogle-cloud-pubsubsubscribegoogle-cloud-pubsublite

Pubsublite subscribe extremely slow for first message


I'm using GCP pubsublite, naive topic with one partition, a few messages. Python client lib. Doing the standard subscription, subscribe client, subscribe method with callback. Between the method is called and the first message is received, it takes about 30 seconds. Subsequent messages are fast because they are already in the cache.

Question about the extremely long onset: is it expected? Or is there some usual suspects?

Thanks for pointers.

Edit: The code is pasted below. It runs in Docker after editing credentials. My printout is

[2022-02-22 14:22:36.162 __main__]  subscribe started
[2022-02-22 14:23:09.187 __main__]  got 1
[2022-02-22 14:23:09.189 __main__]  got 2
[2022-02-22 14:23:09.189 __main__]  got 3
# Using python 3.8

from __future__ import annotations

import logging
import pickle
import queue
import uuid
from contextlib import contextmanager

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
                                                 SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
                                           LocationPath,
                                           ReservationPath, SubscriptionPath,
                                           TopicPath,
                                           )
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials


logging.getLogger('google.cloud').setLevel(logging.WARNING)

logger = logging.getLogger(__name__)

FORMAT = '[%(asctime)s.%(msecs)03d %(name)s]  %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')


class Account:
    def __init__(self,
                 project_id: str,
                 region: str,
                 zone: str,
                 credentials: Credentials,
                 ):
        self.project_id = project_id
        self.region = region
        self.zone = CloudZone.parse(zone)
        self.credentials = credentials
        self.client = AdminClient(region=region, credentials=credentials)

    def location_path(self) -> LocationPath:
        return LocationPath(self.project_id, self.zone)

    def reservation_path(self, name: str) -> ReservationPath:
        return ReservationPath(self.project_id, self.region, name)

    def topic_path(self, name: str) -> TopicPath:
        return TopicPath(self.project_id, self.zone, name)

    def subscription_path(self, name: str) -> SubscriptionPath:
        return SubscriptionPath(self.project_id, self.zone, name)

    def create_reservation(self, name: str, *, capacity: int = 32) -> None:
        path = self.reservation_path(name)
        reservation = GCPReservation(name=str(path),
                                     throughput_capacity=capacity)
        self.client.create_reservation(reservation)
        # logger.info('reservation %s created', name)

    def create_topic(self,
                     name: str,
                     *,
                     partition_count: int = 1,
                     partition_size_gib: int = 30,
                     reservation_name: str = 'default') -> Topic:
        # A topic name can not be reused within one hour of deletion.
        top_path = self.topic_path(name)
        res_path = self.reservation_path(reservation_name)

        topic = GCPTopic(
            name=str(top_path),
            partition_config=GCPTopic.PartitionConfig(count=partition_count),
            retention_config=GCPTopic.RetentionConfig(
                per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
            reservation_config=GCPTopic.ReservationConfig(
                throughput_reservation=str(res_path)))

        self.client.create_topic(topic)
        # logger.info('topic %s created', name)

        return Topic(name, self)

    def delete_topic(self, name: str) -> None:
        path = self.topic_path(name)
        self.client.delete_topic(path)
        # logger.info('topic %s deleted', name)

    def get_topic(self, name: str) -> Topic:
        return Topic(name, self)


class Topic:

    def __init__(self, name: str, account: Account):
        self.account = account
        self.name = name
        self._path = self.account.topic_path(name)

    def create_subscription(self, name: str) -> Subscription:
        path = self.account.subscription_path(name)
        Conf = GCPSubscription.DeliveryConfig
        subscription = GCPSubscription(
            name=str(path),
            topic=str(self._path),
            delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))

        self.account.client.create_subscription(subscription, BacklogLocation.BEGINNING)
        # logger.info('subscription %s created for topic %s', name, self.name)

        return Subscription(name, self)

    def delete_subscription(self, name: str) -> None:
        path = self.account.subscription_path(name)
        self.account.client.delete_subscription(path)
        # logger.info('subscription %s deleted from topic %s', name, self.name)

    def get_subscription(self, name: str):
        return Subscription(name, self)

    @contextmanager
    def get_publisher(self, **kwargs):
        with Publisher(self, **kwargs) as pub:
            yield pub


class Publisher:
    def __init__(self, topic: Topic):
        self.topic = topic
        self._publisher = None

    def __enter__(self):
        self._publisher = PublisherClient(credentials=self.topic.account.credentials)
        self._publisher = self._publisher.__enter__()
        return self

    def __exit__(self, *args, **kwargs):
        self._publisher.__exit__(*args, **kwargs)

    def put(self, data) -> None:
        fut = self._publisher.publish(self.topic._path, pickle.dumps(data))
        fut.result()


class Subscription:
    def __init__(self, name: str, topic: Topic):
        self.topic = topic
        self.name = name
        self._path = topic.account.subscription_path(name)

    @contextmanager
    def get_subscriber(self):
        with Subscriber(self) as sub:
            yield sub


class Subscriber:
    def __init__(self, subscription: Subscription):
        self.subscription = subscription
        self._messages = queue.Queue()
        self._subscriber = None
        self._subscribe_task = None

    def __enter__(self):
        def callback(msg: PubSubMessage):
            logger.info('got %s', pickle.loads(msg.data))
            self._messages.put(msg)

        flowcontrol = FlowControlSettings(
                messages_outstanding=1000, bytes_outstanding=10*1024*1024)
        self._subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
        self._subscriber.__enter__()
        self._subscribe_task = self._subscriber.subscribe(self.subscription._path, callback, flowcontrol)
        logger.info('subscribe started')
        return self

    def __exit__(self, *args, **kwargs):
        self._subscribe_task.cancel()
        self._subscribe_task.result()
        self._subscriber.__exit__(*args, **kwargs)

    def get(self):
        msg = self._messages.get()
        msg.ack()
        return pickle.loads(msg.data)


def get_account() -> Account:
    return Account(project_id='--fill-in--',
                   region='us-central1',
                   zone='us-central1-a',
                   credentials='--fill-in--')


# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test(account):
    name = 'test-' + str(uuid.uuid4())
    topic = account.create_topic(name)
    try:
        with topic.get_publisher() as p:
            p.put(1)
            p.put(2)
            p.put(3)

        sub = topic.create_subscription(name)
        try:
            with sub.get_subscriber() as s:
                z = s.get()
                z = s.get()
                z = s.get()
        finally:
            topic.delete_subscription(name)
    finally:
        account.delete_topic(name)


if __name__ == '__main__':
    a = get_account()
    try:
        a.create_reservation('default')
    except AlreadyExists:
        pass

    test(a)

Solution

  • Google support confirmed the slowness is expected behavior. They plan to add this to the public doc:

    "Subscribers are expected to be long lived. If a subscription has no current subscribers, creating the first subscriber may take up to 1 minute to start receiving messages."