Search code examples
kafka-consumer-apiunsubscribereactor-kafka

Kafka Consumer (using reactor kafka) rejoins the group and is assigned topic partitions immediately after invoking unsubscribe


I am facing an issue while unsubscribing from the Kafka consumer, please note that we are using reactor kafka API. It does unsubscribe successfully in the beginning but then it joins the group immediately and is assigned that topic's partitions so essentially it remains subscribed all the time and keeps consuming messages from topic even when it is not supposed to!

Following is my code for doing this business,

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

private final ReceiverOptions<String, byte[]> receiverOptions;
private Disposable disposable;

public void subscribe() {
    ReceiverOptions<String, byte[]> options = receiverOptions.subscription(Collections.singleton(topic))
            .addAssignListener(partitions -> {/*some listener code here*/})
            .addRevokeListener(partitions -> {/*some listener code here*/});

    Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
    disposable = kafkaFlux
            .publishOn(Schedulers.fromExecutor(executorService))
            .subscribe(record -> {/*consume the record here*/});
}

public void unsubscribe() {
    if (disposable != null)
        disposable.dispose();
}

When I invoke the unsubscribe() method, as shown in the following logs; it revokes the partitions and sends a leave group request to coordinator.

ConsumerCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Revoke previously assigned partitions topic-2
AbstractCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Member consumer-cgid-2-f61f347c-b07c-4037-92f5-9a418ac8d153 sending LeaveGroup request to coordinator kafaka_server:9084 (id: 2147483644 rack: null) due to the consumer is being closed

However immediately after that following set of events happen and logged, where that consumer is assigned topic partitions and starts consuming messages from there. Please note that I have not invoked the subscribe method here!

AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Attempt to heartbeat failed since group is rebalancing
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Revoke previously assigned partitions topic-0, topic-1
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] (Re-)joining group
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully joined group with generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully synced group in generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Notifying assignor about the new Assignment(partitions=[topic-0, topic-1, topic-2])
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Adding newly assigned partitions: topic-0, topic-2, topic-1
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Found no committed offset for partition topic-0
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-2 to the committed offset FetchPosition{offset=2049, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9084 (id: 3 rack: null)], epoch=14}}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-1 to the committed offset FetchPosition{offset=119509, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9083 (id: 2 rack: null)], epoch=19}}
SubscriptionState   |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Resetting offset for partition topic-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9082 (id: 1 rack: null)], epoch=22}}.

Versions used are

org.apache.kafka:kafka-clients:2.8.0

org.apache.kafka:kafka-streams:2.8.0

io.projectreactor.kafka:reactor-kafka:1.1.0.RELEASE

It will be worth noting here that there is another service which is producing messages to the topic that this service is consuming.

It could be something to do with kafka parameter or may be not, but if you have come across an issue like this and has resolved before then please let me know the solution.

Thank you,


Solution

  • I am answering my own question.

    I wrote a standalone program to subscribe and unsubscribe from the concerned topic, and the program was working as expected. It clearly says that there was no problem from the Kafka parameters perspective at all (so something was wrong within the application itself).

    After doing some good code analysis and going through the code line by line, I noticed that subscribe method was getting invoked 2 times. I commented one of those invocations and then tested, and it behaved all well and expected.

    Never thought that by subscribing twice to the topic, that consumer will never be able to unsubscribe!

    NOTE - even after doing 2 invocations of unsubscribe, this consumer would not unsubscribe from the topic. So if it has subscribed twice (or more possibly - but I have not tested that), it will never be able to unsubscribe for its life!

    Is it a normal behavior from Kafka perspective? I am not so sure, keeping this item open for others to respond...

    Thanks...