Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Streams List Serdes : is always empty


What I am doing wrong ?

I am writing a ProcessorSupplier for aggregate n records into one. For that I am using List Serdes ...

My problem is that the ArrayList is always empty.

Using Java 21 and Kafka streams 3.7.0

public class KafkaTheBatcherProcessorApiApplication {

  public static void main(String[] args) {

    final Topology topology = new Topology();

    topology.addSource( "source-node", stringSerde.deserializer(), stringSerde.deserializer(), "inputTopic");

    topology.addProcessor("aggregate-records",
      new BatchProcessorSupplierPersistedStore(),
      "source-node");

    topology.addSink( "sink-node", "outputTopic", stringSerde.serializer(),  listSerde.serializer(), "aggregate-records");

    Properties properties = new Properties();

    try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {     
        kafkaStreams.start();
  }

}

then my Supplier

class BatchProcessorSupplierPersistedStore implements ProcessorSupplier<String, String, String, List<String>> {

  @Override
  public Set<StoreBuilder<?>> stores() {
    return Set.of(
      Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String()))     
    );
  }

  @Override
  public Processor<String, String, String, List<String>> get() {
    return new Processor<>() {

      private ProcessorContext<String, List<String>> context;
      private KeyValueStore<String, List<String>> storeList;  

      @Override
      public void init(ProcessorContext<String, List<String>> context) {
        this.context = context;
        storeList = context.getStateStore("batch-store");        
        this.context.schedule(Duration.ofSeconds(60), PunctuationType.STREAM_TIME, this::forwardAll);
      }

      private void forwardAll(final long timestamp) {
          storeList.all().forEachRemaining(entry -> {
            if (!entry.value.isEmpty()) {
              context.forward(...  
          });

      }

      @Override
      public void process(Record<String, String> record) {
          if (storeList.get(record.key()) == null) {
            storeList.put(record.key(), new ArrayList<>(10000));
          }
          storeList.get(record.key()).add(record.value());
         
      }
    };
  }
}

I added a Store counter:

Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("batch-store-counter"), Serdes.String(), Serdes.Integer())

and this one is worked as expected.


Solution

  • Your initialized list is put into the state store, but you never put the updated list into the state store