Search code examples
apache-kafkaretention

Is it possible to filter Apache Kafka messages by retention time?


At an abstract point of view Apache Kafka stores data in topics. This data could be read by a consumer.

I'd like to have a (monitor)-consumer which greps data with a certain age. The monitor should send a warning to subsystems that records are still unread and would be discarded by Kafka if they reach retention time.

I couldn't find a suitable way until now.


Solution

  • You can use KafkaConsumer.offsetsForTimes() to map messages to dates.

    For example, if you call it with the date of yesterday and it returns offset X, then any messages with an offset smaller than X are older than yesterday.

    Then your logic can figure out from the current positions of your consumers if you are at risk of having unprocessed records discarded.

    Note that there is currently a KIP under discussion to expose metrics to track that: https://cwiki.apache.org/confluence/display/KAFKA/KIP-223+-+Add+per-topic+min+lead+and+per-partition+lead+metrics+to+KafkaConsumer

    http://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-