We are using the functional programming model of Spring Cloud Stream and the Apache Kafka Streams binder. Unfortunately, we are currently not able to access the KafkaEvents - in particular the ListenerContainerIdleEvent.
@EventListener
public void events(KafkaEvent event) {
log.info(event);
}
Our topology looks like this:
@Configuration
public class CodeObjectTopology {
@Bean
public Consumer<KStream<String, CodeObject>> codeObject(KafkaProtobufSerde<CodeObject> codeObjectSerde) {
return input -> input
.process((ProcessorSupplier<String, CodeObject, String, CodeObject>) DataProcessor::new);
}
}
And our application.yaml looks like this:
spring:
cloud:
function:
definition: codeObject
stream:
events:
enabled: true
kafka.streams:
binder:
application-id: app-id
deserializationExceptionHandler: logAndFail
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
schema.registry.url: xxx
auto.offset.reset: earliest
idle-event-interval: 5000
functions:
codeObject:
application-id: input-topic.v1
configuration:
idle-event-interval: 5000
bindings:
codeObject-in-0:
consumer:
destination-is-pattern: false
idle-event-interval: 5000
bindings:
codeObject-in-0:
destination: input-topic.v1
consumer:
auto-startup: true
function:
definition: codeObject
If we try changing the KafkaEvent to SpringApplicationEvent we are getting the corresponding events.
Ultimately we want to be able to tell when a Kafka Streams function is idle (or has lag of 0).
Spring is not involved at runtime for Kafka Streams (KStream
), only the initial setup; there is no equivalent of spring events for Kafka streams (as far as I know). There is no listener container; you probably need to implement your own mechanism.
For the message channel binder, add a ListenerContainerCustomizer
bean:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> cust() {
return (container, group, dest) -> container.getContainerProperties().setIdleEventInterval(5000L);
}
But that doesn't apply to KStream
consumers.