Search code examples
javaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

How to retrieve/set header using functional approach in spring-cloud-stream-binder-kafka-streams:3.1.1


I am using spring-cloud-stream-binder-kafka-streams:3.1.1 with functional programming. How can I retrieve all headers in processor function

Java code

@SpringBootApplication
public class KafkaMessageApplication {
    public static void main(String args[]) {
        SpringApplication.run(KafkaMessageApplication.class, args);
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> process() {
        // TODO investigate headers on the incoming message
        // For example, find partition key on which message was received and publish to same partition key on destination topic
        return input -> input;
    }
}

Solution

  • In order to access the headers like that, you need to use the low-level processor/transformer API in Kafka Streams. You can mix the low-level processor API and the DSL while still is using it as a Spring Cloud Stream application. See this for more details. Basically, you need to use the processor in the case of a consumer and the transformer in the case of a function. The processor is a terminal API and does not allow you to continue further. On the other hand, when using the transformer, you can continue it as a KStream after examining the headers. For example, here is an idea:

    input -> input
                        .transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
                            @Override
                            public Transformer<String, String, KeyValue<String, String>> get() {
                                return new Transformer<Object, String, KeyValue<Object, String>>() {
                                    ProcessorContext context;
                                    @Override
                                    public void init(ProcessorContext context) {
                                        this.context = context;
                                    }
    
                                    @Override
                                    public KeyValue<Object, String> transform(Object key, String value) {
    
    // Here you can access the headers using this.context.headers()
                                        return new KeyValue<>(key, value);
                                    }
    
                                    @Override
                                    public void close() {
    
                                    }
                                };
                            }
                        })
                        .map(...)
                        .groupBy(...)
                        ...
    

    Look at the comment inside the transform method. There, you get access to the headers on each incoming record.

    By looking at your question, I see that you are trying to get the partition id of the incoming record. For that, you can directly call context.partition(). I don't think you need to access the headers for that.

    Here is an SO thread on accessing headers.