Search code examples
apache-pulsar

Why is there a delay delivering messages from new topic when subscribed via regex?


I have a Python program that receives messages from Pulsar and updates a document database. This works great when the messages are published to a pre-existing topic. But when a new topic is created during publish, it takes 45 seconds or so for the subscription to be sent the message from the new topic. Code is pretty straightforward:

def event_received(consumer, message):
    event = Event.from_json(message.properties()['event_type'], message.data().decode('utf-8'))
    history = load_topic(f'persistent://demo/person/{event.id}')
    person = Person('{"id": "%s"}' % event.id)
    person.load(history)
    person.apply(event)
    collection.replace_one({"id": event.id}, person.to_map(), True)


aggregates.subscribe(re.compile('demo/person/.*'), 'person-mongodb', message_listener=event_received,
                     regex_subscription_mode=pulsar.RegexSubscriptionMode.PersistentOnly,
                     initial_position=pulsar.InitialPosition.Earliest)

while True:
    try:
        pass
    except KeyboardInterrupt:
        aggregates.close()

The publish code is equally straightforward:

app.post('/person/created')
def person_created():
    producer = aggregates.create_producer(f'persistent://demo/person/{request.json["id"]}')
    producer.send(json.dumps(request.json).encode('utf-8'), properties={'event_type': 'PersonCreated'})
    producer.close()
    return {}

Can anyone suggest a reason why there is such a long delay prior to the event being received if it is delivered to a new topic?


Solution

  • With the regex consumer, the client is probing every 1min the brokers to discover new topics that were created after the consumer.

    This explains the ~45 seconds delay that you have seen.

    Keep in mind that when the consumer finally discover the topic, no data will be missing since you have set the initial_position=pulsar.InitialPosition.Earliest config in the consumer.

    In Java client, there were already improvements that made the discovery notifications push-based, in order to detect new topics immediately. This was done as part of PIP-145 (https://github.com/apache/pulsar/issues/14505). A further explaination was also published here: https://streamnative.io/blog/improving-regular-expression-based-subscriptions-pulsar-consumers

    These improvements will eventually be implemented in the Pulsar C++ client and made avail