Search code examples
scalaapache-kafkaapache-kafka-streamsspring-kafka

How to store only latest key values in a kafka topic


I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.

I thought a KTable's whole purpose was that it will store the latest value given a key rather than storing the whole stream of events. However I can't seem to get this to work. Running the code below produces the keystore but that keystore (maintopiclatest) has a stream of events in it (not just the latest values). So if I send a request with 1000 records in the topic twice, rather than seeing 1000 records, I see 2000 records.

var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();

var stream = kStreamBuilder.stream("maintopic",
    Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));

var table = stream
    .groupByKey()
    .reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));

The other problem is if I want to store the KTable in a new topic I'm not sure how to do that. In order to do that it seems that I have to turn it back into a Stream so that I can call ".to" on it. But then that has the whole stream of events in it not just the latest values.


Solution

  • This is not how a KTable works.

    A KTable itself, has an internal state store and stores exactly one record per key. However, a KTable is constantly updated and subject to the so-called stream-table-duality. Each update to the KTable is sent downstream as a changelog record: https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables. Thus, each input record result in an output record.

    Because it's stream processing, there is no "last key per value".

    I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.

    At which point in time do you want a KTable to emit an update? There is no answer to this question because the input stream is conceptually infinite.