Search code examples
apache-kafkaapache-kafka-streams

KAFKA-STREAM : Kafka-stream stucked when offset is no more existing


I described the issue at kafka Jira too: https://issues.apache.org/jira/browse/KAFKA-13014

We have kafka-stream with multiple instances and threads.

This kafka-stream consume from a lot of topics.

One of the topic partitions wasn't accessible for a day and the retention of the topic is 4 Hours.

After fixing the problem, the kafka-stream is trying to consume from an offset that does ot exist anymore:

Kafka-consumer-group describe:

We can see that the current offset that the KS is waiting for is 59754934 but the new first offset of this partition is 264896001.

The problem that the Kafka-stream does not throw any exception

that's the only log what i'm seeing 

 enter image description here

08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, groupId=talaria-data-mixed-prod] Updating assignment with08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, groupId=talaria-data-mixed-prod] Updating assignment with Assigned partitions:                       [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] Current owned partitions:                  [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] Added partitions (assigned - owned):       [] Revoked partitions (owned - assigned):     [] 08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, groupId=talaria-data-mixed-prod] Notifying assignor about the new Assignment(partitions=[adm__article_stock_repartition_v3-10, adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer] No followup rebalance was requested, resetting the rebalance schedule.08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO  o.a.k.s.p.internals.TaskManager - stream-thread [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] Handle new assignment with: New active tasks: [0_10] New standby tasks: [0_17, 0_21] Existing active tasks: [0_10] Existing standby tasks: [0_17, 0_21]08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, groupId=talaria-data-mixed-prod] Adding newly assigned partitions:   

PI: version broker kafka : 5.3.4-ccs


Solution

  • The problem was more complicated then i thought.

    It's the problem of mixing exactly_once with using a state store

    When the application has crashed without waiting the shutdown of the stream, the transactions of the latest message of the changelog topic is aborted, so when we restarted the kafka-stream the topology wait to reload the local rocksdb store before starting consumig messages.

    And the bug is there, because there is a check that consumer-metadate "topic.lastoffset" == curent_consumer_offset

    Bu it should be like that:

    consumer-metadate "topic.last_commited_message_and_transaction_offset" == curent_consumer_offset

    I fixed that by switching to at_least_one, but I think that It was fixed on 2.7.1