Search code examples
apache-kafkakafka-consumer-api

Build change history from kafka topic


I have a topic in Kafka where I have records for which I want to build a change history in a consumer.

To do this, whenever a message is appended to the topic with a new value for an existing record (i.e., a message with a key that already existed in the topic), my consumer would retrieve all messages of the same key and build a diff between the values of the last two message.

I can't use infinite retention because the topic would grow too big, so I was hoping to use compaction, but I can't find a way to be sure that the old message won't get removed from the log before the consumer gets to read it.

Is there a way in which I can configure cleanup to achieve this? Or should I take a different approach for this use case?


Solution

  • can't find a way to be sure that the old message won't get removed from the log before the consumer gets to read it

    The LogCleaner runs on its own schedule, and only on closed segments, or for compacted topics, dirty+closed segments where "dirty" is defined as having old values for newer keys.

    consumer would retrieve all messages of the same key

    Compaction would actively remove that data. You would need to manually aggregate that data, on your own into a list, rather than let Kafka delete those values through compaction.

    So, one way would be two topics, first with your regular data - as a compacted topic

    (k1, v1)
    (k2, v2)
    (k1, v3)
    

    Then some process consuming that and building its own stream

    (k1, [v1])
    (k2, [v2])
    (k1, [v1, v3])
    

    This would also be compact, so the first k1 would be removed eventually, but you have history of all previous values, and when building a KTable over it, you can then find all values by key

    Other solution - dump data to an external database table if you will not have enough storage in Kafka to hold all those values. Assuming that your database will have more storage instead