Search code examples
javaconsumerapache-pulsarpulsar

Pulsar Consumer is not consuming fast enough


I have a pulsar client and consumer code as below.

PulsarClient client = PulsarClient.builder()
            .serviceUrl(pulsarServerUrl).enableTlsHostnameVerification(false)
            .listenerThreads(1).ioThreads(1)
            .build();

    BatchReceivePolicy batchReceivePolicy
                    = BatchReceivePolicy.builder().maxNumBytes(10 *
            1024 * 1024).maxNumMessages(100000).timeout(-1, TimeUnit.SECONDS).build();
    consumer = client.newConsumer()
            .topic(topicName)
            .subscriptionType(SubscriptionType.Shared)
            .subscriptionName(subscriptionName)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
            .batchReceivePolicy(batchReceivePolicy)

            .messageListener((consumer1, msg) -> {
                LOG.info("Message Received from Pulsar : " + new String(msg.getData()));
               
                consumer1.acknowledgeAsync(msg);
            })
            .subscribe();

But with this code, there is a huge backlog in pulsar topic. Its around 12 million. My question is how to make pulsar consumer consume more faster? So that the backlog will be 0. BTW, I have tried batch receive as well, but no luck.


Solution

  • Which version are you using?

    If you are using Java Client 2.8.3, 2.9.1, or 2.10.0 It might be related to this one https://github.com/apache/pulsar/pull/15162 This is a performance regression and fixed in 2.10.1 and 2.9.3. For the 2.8.x version, 2.8.4 is in the release process, it will be available in 2 ~ 3 weeks.

    If not the above problem. It's better to provide more context about how high throughput of the topic.