Search code examples
apache-kafkaapache-kafka-streamsktable

Kafka Ktable changelog (using toStream()) is missing some ktable updates when several messages with the same key arrive at the same time


I have an input stream, I use it to create a ktable. Then I create an output stream with the ktable changelog, using toStream() method. The problem is that the stream created by the toStream() method does not contains all the messages from the input stream that has updated my KTable. Here is my code :

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
      aggregateKtableMethod,
      storageConf);

KStream<String, event> outputStream = KTable.toStream();

I would like to get one message in the outputStream for each message in inputStream. For most of the messages it is working well, but I am losing some events in a particular case : if I receive 2 messages with the same key in a small interval of time (less than 5 seconds). In this case I only receive the second event in the outputStream.

I think it is because the Ktable updates are made by some batch operations, but I can't find any configuration or documentation related to it. Is it the reason of these missing events and do you know how to change the configuration so that I will not lose any message ?


Solution

  • I found the solution. The issue was in the "storageConf" I have used to create my ktable, the cache was able. I just had to disabled it, with the function :

    storageConf.withCachingDisabled();
    
    final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
      aggregateKtableMethod,
      storageConf);
    

    Now I have all my events in the output stream.