Search code examples
javakotlinapache-kafka-streams

How to pass parameter to a kafka processor


Is there a way to pass a parameter to a Kafka processor?

E.g. Suppose I have something like:

    private fun identifyBadPings(
pingStream: KStream<ID, Ping>,
        mySignletonBadPingIdentifier: BadPingIdentifier
    ): KStream<ID, Ping> {

        return pingStream.transform(TransformerSupplier { BadPingsMarker() }, MY_STATE_STORE)
        // Could I somehow pass `mySignletonBadPingIdentifier` here? Like: 
        // return pingStream.transform(TransformerSupplier { BadPingsMarker(mySignletonBadPingIdentifier) }, MY_STATE_STORE)
    }

where my transformer is something like this:

class BadPingsMarker : Transformer<ID, Ping, KeyValue<ID, Ping>> {
    var state: KeyValueStore<String, Tuple<String, String>>? = null
    val logger: Logger = LogManager.getLogger(BadPingsMarker::class.java)!!
    val pingIdentifier: BadPingIdentifier

    override fun init(context: ProcessorContext, badPingIdentifier: BadPingIdentifier) {
        state = context.getStateStore(MY_STATE_STORE) as KeyValueStore<String, Tuple<String, String>>
        pingIdentifier = badPingIdentifier
    }

    override fun transform(key: ID, value: Ping): KeyValue<ID, Ping> {
        val somevalue = value.somevalue
        val state_checker = state!![MY_STATE_STORE_A]

        // .. IMPLEMENT MARKING LOGIC HERE USING pingIdentifier
        return KeyValue(key, value)
    }

    override fun close() {}
}

Solution

  • You can modify your class to accept BadPingIdentifier in the constructor, like this:

    class BadPingsMarker(private val pingIdentifier: BadPingIdentifier) : Transformer<ID, Ping, KeyValue<ID, Ping>> {
        
    }
    

    Then remove badPingIdentifier parameter from init method and remove val pingIdentifier: BadPingIdentifier from class field declarations. Once you do that, your commented out code in identifyBadPings will work.