Search code examples
springspring-bootkafka-consumer-apiapache-kafka-streamsspring-kafka

Get Partition and Offset number in which kafka message is being processed using StreamBridge


I need to print/log/store the kafka partition and offset number in which my message is being processed. How can I achieve that? I am using StreamBridge to send the message from producer and also using functional spring kafka streams approach

Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}

Solution

  • The record metadata is available (asynchronously) via the metadata channel:

    @SpringBootApplication
    public class So66436499Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66436499Application.class, args);
        }
    
        @Autowired
        StreamBridge bridge;
    
        @Bean
        public ApplicationRunner runner() {
            return args -> {
                this.bridge.send("myBinding", "test");
                Thread.sleep(5000);
            };
        }
    
        @ServiceActivator(inputChannel = "meta")
        void meta(Message<?> sent) {
            System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
        }
    
    }
    
    spring.cloud.stream.bindings.myBinding.destination=foo
    spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
    
    Sent: foo-0@5