Search code examples
apache-kafkaspring-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Restoration of GlobalKTables is extremely slow


Since we introduced GlobalKTables in streams in several services, the startup time of the services grew to unbearable amounts of time. We have a listener observing the state of state store restoration and this is what stands in the logs:

[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Subscribed to partition(s): wrwks-bef-equipmenttyp-aggregat-privat-1-4
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Seeking to offset 4 for partition wrwks-bef-equipmenttyp-aggregat-privat-1-4
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreStart for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-4
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - started state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, remaining partitions: 1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreEnd for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-4
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - finished state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Subscribed to partition(s): wrwks-bef-equipmenttyp-aggregat-privat-1-1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Seeking to offset 1 for partition wrwks-bef-equipmenttyp-aggregat-privat-1-1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreStart for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - started state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, remaining partitions: 1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreEnd for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - finished state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Subscribed to partition(s): wrwks-bef-equipmenttyp-aggregat-privat-1-6
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Seeking to offset 1 for partition wrwks-bef-equipmenttyp-aggregat-privat-1-6
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreStart for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-6
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - started state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, remaining partitions: 1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreEnd for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-6
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - finished state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Subscribed to partition(s): wrwks-bef-equipmenttyp-aggregat-privat-1-7
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Seeking to offset 1 for partition wrwks-bef-equipmenttyp-aggregat-privat-1-7
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreStart for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-7
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - started state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, remaining partitions: 1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreEnd for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-7
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - finished state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Subscribed to partition(s): wrwks-bef-equipmenttyp-aggregat-privat-1-3
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-global-consumer, groupId=null] Seeking to offset 4 for partition wrwks-bef-equipmenttyp-aggregat-privat-1-3
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] DEBUG a.w.b.p.h.StreamAndStateStoreStateListener - onRestoreStart for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, partition wrwks-bef-equipmenttyp-aggregat-privat-1-3
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - started state restore for store wrwks-bef-equipmenttyp-aggregat-privat-1-STATE-STORE-0000000005, remaining partitions: 1
[wrwks-bef-projekt-export-projektAggregate-5e88016d-ae03-49fe-80b0-46c2238f528d-GlobalStreamThread] INFO  a.w.b.p.h.StreamAndStateStoreStateListener - publishing availability information: liveness false, readiness false

This topic consists of 20 records and the bootstrap time is several minutes if it completes. In other cases it doesn't complete at all which leads to repeated restarts by the k8s watchdog after a wait time of 5 minutes in our cluster.

A very annoying workaround is to delete the state store directory and restart the service, then the service usually starts within a reasonable amount of time.

This behaviour applies to all services using GlobalKTables. All services use Kafka Streams via Spring Cloud Stream 2021.0.5.


Solution

  • In the meantime both, the services (to Spring Cloud 2022.0.1 with Kafka Streams 3.3.2) as well as the brokers (Kafka 3.0.1) have been upgraded and thus the issue vanished.