Search code examples
apache-kafkaapache-kafka-streams

How to query the state store in the Kafka Streams DSL to implement consumer idempotency


I'm working in an scenario where duplicated messages could arrive at a consumer (a KStream application). To use the typical case let's suppose it's an OrderCreatedEvent and the KStream has a logic that processes the order. The event has an order-id that would help me identify duplicated messages.

What I want to do is:

1) Add every order to a persistent state store

2) When processing the message in the KStream, query the state store to check if the message had already been received, not doing anything in that case.

        val persistentKeyValueStore = Stores.persistentKeyValueStore("order-store")

        val stateStore: Materialized<Int, Order, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Order>(persistentKeyValueStore)
                        .withKeySerde(intSerde)
                        .withValueSerde(orderSerde)

        val orderTable: KTable<Int, Order> = input.groupByKey(Serialized.with(intSerde, orderSerde))
                .reduce({ _, y -> y }, stateStore)

        var orderStream: KStream<Int, Order> = ...

        orderStream.filter { XXX }
                   .map { key, value -> 
                      processingLogic()
                      KeyValue(key, value)
                   }...

In the filter { XXX } bit I would like to query the state store check if the order id is there (let's assume the order is used as the key of the keyvaluestore), filtering out orders already processed (present in the state store).

My first question is: how can I query a state store in the KStream DSL, e.g. inside the filter operation.

Second question: in this case, how can I handle the arrival of a new (not previously processed message)? If the KTable persists the order to the state store BEFORE the orderStream KStream execution the message would already be in the store. They should be added only after the processing has completed. How can I do this? It's likely I shouldn't be using a KTable for it but something like:

           orderStream.filter { keystore.get(key) == null }
                   .map { key, value -> 
                       processingLogic()
                       KeyValue(key, value)
                   }
                   .foreach { key, value -> 
                       keystore.put(key, value); 
                   }

Solution

  • Following Matthias' indications I implemented it like this:

    DeduplicationTransformer

    package com.codependent.outboxpattern.operations.stream
    
    import com.codependent.outboxpattern.account.TransferEmitted
    import org.apache.kafka.streams.KeyValue
    import org.apache.kafka.streams.kstream.Transformer
    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore
    import org.slf4j.LoggerFactory
    
    
    @Suppress("UNCHECKED_CAST")
    class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {
    
        private val logger = LoggerFactory.getLogger(javaClass)
        private lateinit var dedupStore: KeyValueStore<String, String>
        private lateinit var context: ProcessorContext
    
        override fun init(context: ProcessorContext) {
            this.context = context
            dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, String>
        }
    
        override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
            return if (isDuplicate(key)) {
                logger.warn("****** Detected duplicated transfer {}", key)
                null
            } else {
                logger.warn("****** Registering transfer {}", key)
                dedupStore.put(key, key)
                KeyValue(key, value)
            }
        }
    
        private fun isDuplicate(key: String) = dedupStore[key] != null
    
        override fun close() {
        }
    }
    

    FraudKafkaStreamsConfiguration

    const val DEDUP_STORE = "dedup-store"
    
    @Suppress("UNCHECKED_CAST")
    @EnableBinding(TransferKafkaStreamsProcessor::class)
    class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {
    
        private val logger = LoggerFactory.getLogger(javaClass)
    
        @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
        @StreamListener
        @SendTo(value = ["outputKo", "outputOk"])
        fun process(@Input("input") input: KStream<String, TransferEmitted>): Array<KStream<String, *>>? {
            val fork: Array<KStream<String, *>> = input
                    .transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
                    .branch(Predicate { _: String, value -> fraudDetectionService.isFraudulent(value) },
                            Predicate { _: String, value -> !fraudDetectionService.isFraudulent(value) }) as Array<KStream<String, *>>
                     ...