Search code examples
apache-kafka-streams

Kafka Streams - override default addSink implementation / custom producer


It is my first post to this here and I am not sure if this was covered here before, but here goes: I have a Kafka Streams application, using Processor API, following the topology below:

1. Consume data from an input topic (processor.addSource())
2. Inserts data into a DB (processor.addProcessor())
3. Produce its process status to an output topic (processor.addSink())

App works big time, however, for traceability purposes, I need to have in the logs the moment kstreams produced a message to the output topic, as well as its RecordMetaData (topic, partition, offset).

Example below:

KEY="MY_KEY" OUTPUT_TOPIC="MY-OUTPUT-TOPIC" PARTITION="1" OFFSET="1000" STATUS="SUCCESS"

I am not sure if there is a way to override the default kafka streams producer to add this logging or maybe creating my own producer to plug it on the addSink process. I partially achieved it by implementing my own ExceptionHandler (default.producer.exception.handler), but it only covers the exceptions.

Thanks in advance,

Guilherme


Solution

  • If you configure the streams application to use a ProducerInterceptor, then you should be able to get the information you need. Specifically, implementing the onAcknowledgement() will provide access to everything you listed above.

    To configure interceptors in a streams application:

     Properties props = new Properties();
     // add this configuration in addition to your other streams configs
    
    props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(MyProducerInterceptor.class));
    
    

    You can provide more than one interceptor if desired, just add the class name and change the list implementation from a singleton to a regular List. Execution of the interceptors follows the order of the classes in the list.

    EDIT: Just to be clear, you can override the provided Producer in Kafka Streams via the KafkaClientSupplier interface, but IMHO using an interceptor is the cleaner approach. But which direction to go is up to you. You pass in your KafkaClientSupplier in an overloaded Kafka Streams constructor.