Search code examples
spring-bootapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream: How to get KafkaEvents like ListenerContainerIdleEvent?


We are using the functional programming model of Spring Cloud Stream and the Apache Kafka Streams binder. Unfortunately, we are currently not able to access the KafkaEvents - in particular the ListenerContainerIdleEvent.

@EventListener
public void events(KafkaEvent event) {
    log.info(event);
}

Our topology looks like this:

@Configuration
public class CodeObjectTopology {

    @Bean
    public Consumer<KStream<String, CodeObject>> codeObject(KafkaProtobufSerde<CodeObject> codeObjectSerde) {

        return input -> input
            .process((ProcessorSupplier<String, CodeObject, String, CodeObject>) DataProcessor::new);
    }
}

And our application.yaml looks like this:

spring:
  cloud:
    function:
      definition: codeObject
    stream:
      events:
        enabled: true
      kafka.streams:
        binder:
          application-id: app-id
          deserializationExceptionHandler: logAndFail
          configuration:
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
            schema.registry.url: xxx
            auto.offset.reset: earliest
            idle-event-interval: 5000
          functions:
            codeObject:
              application-id: input-topic.v1
              configuration:
                idle-event-interval: 5000
        bindings:
          codeObject-in-0:
            consumer:
              destination-is-pattern: false
              idle-event-interval: 5000
      bindings:
        codeObject-in-0:
          destination: input-topic.v1
          consumer:
            auto-startup: true
       function:
        definition: codeObject

If we try changing the KafkaEvent to SpringApplicationEvent we are getting the corresponding events.

Ultimately we want to be able to tell when a Kafka Streams function is idle (or has lag of 0).


Solution

  • Spring is not involved at runtime for Kafka Streams (KStream), only the initial setup; there is no equivalent of spring events for Kafka streams (as far as I know). There is no listener container; you probably need to implement your own mechanism.

    For the message channel binder, add a ListenerContainerCustomizer bean:

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> cust() {
        return (container, group, dest) -> container.getContainerProperties().setIdleEventInterval(5000L);
    }
    

    But that doesn't apply to KStream consumers.