I am using the Processor API to do some low level processing into a state store. The point is I also need to write into a topic after storing into the store. How can it be done in a Spring Cloud Streams Kafka applications?
@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->
event.map{
...
}.process(ProcessorSupplier {
object : Processor<EventId, MappedEventValue> {
private lateinit var store: KeyValueStore<EventId, MappedEventValue>
override fun init(context: ProcessorContext) {
store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
}
override fun process(key: EventId, value: MappedEventValue) {
...
store.put(key, processedMappedEventValue)
//TODO Write into a topic
}
}
}
}
You can't. The process()
method is a terminal operation that does not allow you to emit data downstream. Instead, you can use transform()
though (it's basically the same a process()
but allows you to emit data downstream); or depending on your app, transformValues()
or flatTransform()
etc.
Using transform()
you get KStream
back, that you can write into a topic.