Search code examples
apache-kafkaapache-kafka-streams

How to pass a final groupBy() to a Kafka topic


Please help me understand KafkaStreams. there is a code that does a normal groupBy():

public static void createGroupByStream(final StreamsBuilder builder) {   
    KStream<String, Person> stream = builder.stream("kafka-stream-input", Consumed.with(Serdes.String(), CustomSerdes.PersonSerde()));  
      stream  
            .groupByKey()  
            .reduce((item, second) -> {  
                item.setCount(item.getCount() + second.getCount());  
                return item;  
            } )  
            .toStream()  
            .to("kafka-stream-output", Produced.with(Serdes.String(), CustomSerdes.PersonSerde()));  
}

class Person:

public class Person extends GroupingEntity {  
    private String name;  
    private int count;
    }

class CustomSerdes:


public final class CustomSerdes {  
 
    public static Serde<Person> PersonSerde() {  
        JsonSerializer<Person> serializer = new JsonSerializer<>();  
        JsonDeserializer<Person> deserializer = new JsonDeserializer<>(Person.class);  
        return Serdes.serdeFrom(serializer, deserializer);  
    }

At the moment, the data is updated in a stream, i.e. the "kafka-stream-output" topic stores the entire history of updating the count field for each Person. I want the output topic (in the example it is "kafka-stream-output") to show information only when all messages from the "kafka-stream-input" topic have been processed. Those. there was only the final groupBy. Can you suggest how to do this?

The data is transferred from a csv file, where each line is one message. There may be a large number (several millions)

enter image description here Those. in theory, I can initially count the number of rows (count) and wait until the *count *messages in the topic are processed. But how to do it programmatically?


Solution

  • As indicated in the previous answer, Kafka Streams doesn't have a concept of "done" since event streams are considered infinite. One thing to consider is using windowing with suppression and only emitting a result when the window closes.