Please help me understand KafkaStreams. there is a code that does a normal groupBy():
public static void createGroupByStream(final StreamsBuilder builder) {
KStream<String, Person> stream = builder.stream("kafka-stream-input", Consumed.with(Serdes.String(), CustomSerdes.PersonSerde()));
stream
.groupByKey()
.reduce((item, second) -> {
item.setCount(item.getCount() + second.getCount());
return item;
} )
.toStream()
.to("kafka-stream-output", Produced.with(Serdes.String(), CustomSerdes.PersonSerde()));
}
class Person:
public class Person extends GroupingEntity {
private String name;
private int count;
}
class CustomSerdes:
public final class CustomSerdes {
public static Serde<Person> PersonSerde() {
JsonSerializer<Person> serializer = new JsonSerializer<>();
JsonDeserializer<Person> deserializer = new JsonDeserializer<>(Person.class);
return Serdes.serdeFrom(serializer, deserializer);
}
At the moment, the data is updated in a stream, i.e. the "kafka-stream-output" topic stores the entire history of updating the count field for each Person. I want the output topic (in the example it is "kafka-stream-output") to show information only when all messages from the "kafka-stream-input" topic have been processed. Those. there was only the final groupBy. Can you suggest how to do this?
The data is transferred from a csv file, where each line is one message. There may be a large number (several millions)
enter image description here Those. in theory, I can initially count the number of rows (count) and wait until the *count *messages in the topic are processed. But how to do it programmatically?
As indicated in the previous answer, Kafka Streams doesn't have a concept of "done" since event streams are considered infinite. One thing to consider is using windowing with suppression and only emitting a result when the window closes.