Search code examples
apache-kafkaapache-kafka-streams

In Kafka Streams Application the second output stream is not written anymore


I am currently implementing a Kafka Streams Application where I am reading one topic and doing some processing. During processing I am splitting it into two streams, one is written into one topic (Avro Schema) the other one is a counting aggregation (word count) writing Key/Value Pairs (String/Long) into a different topic. The code worked fine beforehand but recently the second stream is not written anymore.

In this code example:

KStream<String, ProcessedSentence> sentenceKStream = stream
        .map((k,v) -> {
                [...]
        });

// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);

// write to Specific Avro Record
sentenceKStream
        .to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
                Produced.with(
                        Serdes.String(),
                        valueProcessedSentence));

the stream of sentences (sentenceKStream) is written correctly but the problem arises with the word count grouping:

KStream<String, Long> wordCountKStream =
        sentenceKStream.flatMap((key, processedSentence) -> {
            List<KeyValue<String, Long>> result = new LinkedList<>();
            Map<CharSequence, Long> words = processedSentence.getWords();
            for (CharSequence word: words.keySet() ) {
                result.add(KeyValue.pair(word.toString(), words.get(word)));
            }
            return result;
        })
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(Long::sum)
        .toStream();

// write to Specific Avro Record
wordCountKStream
        .to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
                Produced.with(
                        Serdes.String(),
                        Serdes.Long()));

I really don't get why the wordCountKStream is not written anymore.

Maybe somebody could provide some help? I'd be happy to provide any further details!

Many Thanks

Update: I found out that the data is missing in both new output streams. Actually, everything is written correctly but a couple of minutes after writing the data is deleted from both topics (0 Bytes left).


Solution

  • It had nothing to do with the implementation itself. I've just delete all topic offsets using

    kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]
    

    which solved the problem. There just had been a problem with the stored offsets and conflicted with multiple restarts of the streams application during the debug process.

    For those who want to list the groups in order to find the stored topic positions call

    kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
    

    Update: Unfortunately, the deletion of the group offsets also did not work properly. The actual problem has been, that the timestamp taken for the new entries in the output topics was the one from the original topic (consumed) which did not change at all. Therefore, the new entries are carrying timestamps older than the default retention time.

    As the consumed topic had a rentention.ms of -1 (keep data forever) and the new topics the standard of, I think, 6 days, the entries in the consumed topic are still there but the ones in produced topic were always deleted because they were older than 6 days.

    The final solution (for me) was to change the retention.ms to -1 for the output topics, which means "keep forever". Which is probably not the best solution for a production environment.

    Hint: for Streams Applications it is recommended to use the Application Reset Tool instead of the manual reset/deletion of the offsets as shown above.