Search code examples
javaapache-kafkaapache-kafka-streams

Streams Processor key-value types


I'm trying to implement a custom topology processing step implementing the Processor interface and then adding an instance of my custom processor to the topology via KStream.process, however, this always returns void and only allows Processor<KIn,VIn,Void,Void> but I would like to add more processing steps after. My doubts are:

  1. Can another processing step be added after KStream.process?
  2. How should a custom processor with specific output types be used?

Solution

  • This was recently addressed by KIP-820 (in this PR), by changing the signatures from:

    void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)
    

    to

    KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, String... stateStoreNames)
    

    It'll be available in version 3.3 from the looks of things