Search code examples
javaapache-pulsar

Apache Pulsar - What is the behaviour of the Consumer.seek() method by timestamp?


https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-

When calling seek(long timestamp) method on the consumer, does timestamp have to equal the exact time a message was published? For example, if i sent three messages at t=1, 5, 7 and if i call consumer.seek(3), will i get an error? or will my consumer get reset to t=3, so that if i call consumer.next(), i'll get my second message?

Thanks in advance,


Solution

  • The Consumer#seek(long timestamp) allows you to reset your subscription to a given timestamp. After seeking the consumer will start receiving messages with a publish time equal to or greater than the timestamp passed to the seek method.

    The below example show how to reset a consumer to the previous hour:

    try (
        // Create PulsarClient
        PulsarClient client = PulsarClient
            .builder()
            .serviceUrl("pulsar://localhost:6650")
            .build();
        // Create Consumer subscription
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionMode(SubscriptionMode.Durable)
            .subscriptionType(SubscriptionType.Key_Shared)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
            .subscribe()
    ) {
        // Seek consumer to previous hour
        consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
        while (true) {
            final Message<String> msg = consumer.receive();
            System.out.printf(
                "Message received: key=%s, value=%s, topic=%s, id=%s%n",
                msg.getKey(),
                msg.getValue(),
                msg.getTopicName(),
                msg.getMessageId().toString());
            consumer.acknowledge(msg);
        }
    }
    

    Note that if you have multiple consumers that belong to the same subscriptio ( e.g., Key_Shared) then all consumers will be reset.