Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Writing to a topic from a Processor in a Spring Cloud Streams Kafka Stream application


I am using the Processor API to do some low level processing into a state store. The point is I also need to write into a topic after storing into the store. How can it be done in a Spring Cloud Streams Kafka applications?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  

Solution

  • You can't. The process() method is a terminal operation that does not allow you to emit data downstream. Instead, you can use transform() though (it's basically the same a process() but allows you to emit data downstream); or depending on your app, transformValues() or flatTransform() etc.

    Using transform() you get KStream back, that you can write into a topic.