Is there a way to pass a parameter to a Kafka processor?
E.g. Suppose I have something like:
private fun identifyBadPings(
pingStream: KStream<ID, Ping>,
mySignletonBadPingIdentifier: BadPingIdentifier
): KStream<ID, Ping> {
return pingStream.transform(TransformerSupplier { BadPingsMarker() }, MY_STATE_STORE)
// Could I somehow pass `mySignletonBadPingIdentifier` here? Like:
// return pingStream.transform(TransformerSupplier { BadPingsMarker(mySignletonBadPingIdentifier) }, MY_STATE_STORE)
}
where my transformer is something like this:
class BadPingsMarker : Transformer<ID, Ping, KeyValue<ID, Ping>> {
var state: KeyValueStore<String, Tuple<String, String>>? = null
val logger: Logger = LogManager.getLogger(BadPingsMarker::class.java)!!
val pingIdentifier: BadPingIdentifier
override fun init(context: ProcessorContext, badPingIdentifier: BadPingIdentifier) {
state = context.getStateStore(MY_STATE_STORE) as KeyValueStore<String, Tuple<String, String>>
pingIdentifier = badPingIdentifier
}
override fun transform(key: ID, value: Ping): KeyValue<ID, Ping> {
val somevalue = value.somevalue
val state_checker = state!![MY_STATE_STORE_A]
// .. IMPLEMENT MARKING LOGIC HERE USING pingIdentifier
return KeyValue(key, value)
}
override fun close() {}
}
You can modify your class to accept BadPingIdentifier
in the constructor, like this:
class BadPingsMarker(private val pingIdentifier: BadPingIdentifier) : Transformer<ID, Ping, KeyValue<ID, Ping>> {
}
Then remove badPingIdentifier
parameter from init
method and remove val pingIdentifier: BadPingIdentifier
from class field declarations. Once you do that, your commented out code in identifyBadPings
will work.