Search code examples
apache-kafka-streams

How to handle duplicate messages using Kafka streaming DSL functions


My requirement is to skip or avoid duplicate messages(having same key) received from INPUT Topic using kafka stream DSL API.

There is possibility of source system sending duplicate messages to INPUT topic in case of any failures.

FLOW -

Source System --> INPUT Topic --> Kafka Streaming --> OUTPUT Topic

Currently I am using flatMap to generate multiple keys out the payload but flatMap is stateless so not able to avoid duplicate message processing upon receiving from INPUT Topic.

I am looking for DSL API which can skip duplicate records received from INPUT Topic and also generate multiple key/values before sending to OUTPUT Topic.

Thought Exactly Once configuration will be useful here to deduplicate messages received from INPUT Topic based on keys but looks like its not working, probably I did not understand usage of Exactly Once.

Could you please put some light on it.


Solution

  • My requirement is to skip or avoid duplicate messages(having same key) received from INPUT Topic using kafka stream DSL API.

    Take a look at the EventDeduplication example at https://github.com/confluentinc/kafka-streams-examples, which does that. You can then adapt the example with the required flatMap functionality that is specific to your use case.

    Here's the gist of the example:

    final KStream<byte[], String> input = builder.stream(inputTopic);
    final KStream<byte[], String> deduplicated = input.transform(
        // In this example, we assume that the record value as-is represents a unique event ID by
        // which we can perform de-duplication.  If your records are different, adapt the extractor
        // function as needed.
        () -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value),
        storeName);
    deduplicated.to(outputTopic);
    

    and

        /**
         * @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
         *                                     ID), during the time of which any incoming duplicates of
         *                                     the event will be dropped, thereby de-duplicating the
         *                                     input.
         * @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
         *                    records; if it returns null, the record will not be considered for
         *                    de-duping but forwarded as-is.
         */
        DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
          if (maintainDurationPerEventInMs < 1) {
            throw new IllegalArgumentException("maintain duration per event must be >= 1");
          }
          leftDurationMs = maintainDurationPerEventInMs / 2;
          rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
          this.idExtractor = idExtractor;
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void init(final ProcessorContext context) {
          this.context = context;
          eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
        }
    
        public KeyValue<K, V> transform(final K key, final V value) {
          final E eventId = idExtractor.apply(key, value);
          if (eventId == null) {
            return KeyValue.pair(key, value);
          } else {
            final KeyValue<K, V> output;
            if (isDuplicate(eventId)) {
              output = null;
              updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
            } else {
              output = KeyValue.pair(key, value);
              rememberNewEvent(eventId, context.timestamp());
            }
            return output;
          }
        }
    
        private boolean isDuplicate(final E eventId) {
          final long eventTime = context.timestamp();
          final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
              eventId,
              eventTime - leftDurationMs,
              eventTime + rightDurationMs);
          final boolean isDuplicate = timeIterator.hasNext();
          timeIterator.close();
          return isDuplicate;
        }
    
        private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
          eventIdStore.put(eventId, newTimestamp, newTimestamp);
        }
    
        private void rememberNewEvent(final E eventId, final long timestamp) {
          eventIdStore.put(eventId, timestamp, timestamp);
        }
    
        @Override
        public void close() {
          // Note: The store should NOT be closed manually here via `eventIdStore.close()`!
          // The Kafka Streams API will automatically close stores when necessary.
        }
    
      }
    

    I am looking for DSL API which can skip duplicate records received from INPUT Topic and also generate multiple key/values before sending to OUTPUT Topic.

    The DSL doesn't include such functionality out of the box, but the example above shows how you can easily build your own de-duplication logic by combining the DSL with the Processor API of Kafka Streams, with the use of Transformers.

    Thought Exactly Once configuration will be useful here to deduplicate messages received from INPUT Topic based on keys but looks like its not working, probably I did not understand usage of Exactly Once.

    As Matthias J. Sax mentioned in his answer, from Kafka's perspective these "duplicates" are not duplicates from the point of view of its exactly-once processing semantics. Kafka ensures that it will not introduce any such duplicates itself, but it cannot make such decisions out-of-the-box for upstream data sources, which are black box for Kafka.