Search code examples
apache-kafkaapache-kafka-streams

Kafka Stream fixed window not grouping by key


I get a single Kafka Stream. How can I accumulate messages for a specific time window irrespective of the key?

My use case is to write a file every 10 minutes out of a stream not considering the key.


Solution

  • You'll need to use a Transformer with a state store and schedule a punctuation call to go through the store every 10 minutes and emit the records. The transformer should return null as you are collecting the records in the state store, so you'll also need a filter after the transformer to ignore any null records.

    Here's a quick example of something I think is close to what you are asking for. Let me know how it goes.

    class WindowedTransformerExample {
    
      public static void main(String[] args) {
        final StreamsBuilder builder = new StreamsBuilder();
    
        final String stateStoreName = "stateStore";
        final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName),
                Serdes.String(),
                Serdes.String());
    
        builder.addStateStore(keyValueStoreBuilder);
    
           builder.<String, String>stream("topic").transform(new WindowedTransformer(stateStoreName), stateStoreName)
            .filter((k, v) -> k != null && v != null)
               // Here's where you do something with records emitted after 10 minutes
            .foreach((k, v)-> System.out.println());
      }
    
    
      static final class WindowedTransformer implements TransformerSupplier<String, String, KeyValue<String, String>> {
    
        private final String storeName;
    
        public WindowedTransformer(final String storeName) {
             this.storeName = storeName;
        }
    
        @Override
        public Transformer<String, String, KeyValue<String, String>> get() {
          return new Transformer<String, String, KeyValue<String, String>>() {
            private KeyValueStore<String, String> keyValueStore;
            private ProcessorContext processorContext;
    
            @Override
            public void init(final ProcessorContext context) {
              processorContext = context;
              keyValueStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
              // could change this to PunctuationType.STREAM_TIME if needed
              context.schedule(Duration.ofMinutes(10), PunctuationType.WALL_CLOCK_TIME, (ts) -> {
                try(final KeyValueIterator<String, String> iterator = keyValueStore.all()) {
                    while (iterator.hasNext()) {
                      final KeyValue<String, String> keyValue = iterator.next();
                      processorContext.forward(keyValue.key, keyValue.value);
                    }
                }
              });
            }
    
            @Override
            public KeyValue<String, String> transform(String key, String value) {
              if (key != null) {
                keyValueStore.put(key, value);
              }
              return null;
            }
    
            @Override
            public void close() {
    
            }
          };
        }
      }
    }