I am using spring-cloud-stream-binder-kafka-streams:3.1.1
with functional programming. How can I retrieve all headers in processor function
Java code
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
// TODO investigate headers on the incoming message
// For example, find partition key on which message was received and publish to same partition key on destination topic
return input -> input;
}
}
In order to access the headers like that, you need to use the low-level processor/transformer API in Kafka Streams. You can mix the low-level processor API and the DSL while still is using it as a Spring Cloud Stream application. See this for more details. Basically, you need to use the processor in the case of a consumer and the transformer in the case of a function. The processor is a terminal API and does not allow you to continue further. On the other hand, when using the transformer, you can continue it as a KStream
after examining the headers. For example, here is an idea:
input -> input
.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<Object, String, KeyValue<Object, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// Here you can access the headers using this.context.headers()
return new KeyValue<>(key, value);
}
@Override
public void close() {
}
};
}
})
.map(...)
.groupBy(...)
...
Look at the comment inside the transform
method. There, you get access to the headers on each incoming record.
By looking at your question, I see that you are trying to get the partition id of the incoming record. For that, you can directly call context.partition(). I don't think you need to access the headers for that.
Here is an SO thread on accessing headers.