Search code examples
apache-kafkakafka-consumer-apiapache-kafka-streamssliding-window

How to implement sliding window with kafka stream without key?


I want to do real-time aggregation using kafka streams. However, in my topic, only the value is stored without the key. value is a json type, for example, as follows.

{ "member_no" : "123", "item_no": "item_123", "category_no": "Category_123", "order_no": "Order_123", "datetime": "2022-1-11-09 11:00:00" }

In this case, I want to count "member_no" by "item_no" for 5 minutes using the sliding window method. See sql expression -> select item_no, count(member_no) from table group by item_no

Development of kafka streams is developed in java using spring(sts).

public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CurrentTimestampExtractor.class.getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(ORIGIN_STREAM);

        KTable<Windowed<String>, Long> Sliding_Stream = stream
                   .groupBy((key, value) -> value.item_no)
    .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(5)))
               . count();

             ###################################    
              I don't know how to write the above.
              Even if I look at several examples, most of the cases where a key value exists.
              I'm not good at java, so please forgive me. 
             ###################################                    
        
               

        Sliding_Stream.toStream().to(TARGET_STREAM);

        @SuppressWarnings("resource")
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }

please help me.

thanks you

In the case of groupBy, it is said to be inefficient because there is a repartitioning operation. That's why groupByKey is recommended. Would it be better to create a key, store it in a new topic, and work on it?


Solution

  • As the javadoc for StreamsBuilder.stream puts it:

    Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

    So, in principle, what you are doing will work. Whether you selectKey and then groupByKey or use a groupBy directly, will not make a difference. The next operation that requires co-partitioning (such as the aggregation that you want to perform) will repartition the input topic.

    In your example code, you just need to work out serialization / deserialization, and it should work.