Search code examples
apache-kafkakafka-producer-api

Produce Record to Kafka


I am trying to produce a record to Kafka without passing any partition value but want to send a header and I have the below constructor methods to produce record to Kafka:

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value)  

Creates a record with a specified timestamp to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value, java.lang.Iterable<Header> headers)  

Creates a record with a specified timestamp to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, K key, V value)    

Creates a record to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, java.lang.Integer partition, K key, V value, java.lang.Iterable<Header> headers)    

Creates a record to be sent to a specified topic and partition

ProducerRecord​(java.lang.String topic, K key, V value) 

Create a record to be sent to Kafka

ProducerRecord​(java.lang.String topic, V value)    

Create a record with no key

In all the above methods, there is no way I can send a header without sending a partition value, and if I set partition as null I am getting NullPointerException.

Can you please advice me how to produce record in Kafka by sending a header and not partition value.


Solution

  • Kafka ProducerRecord Instance provides the Headers Instance where we can add key-value pairs.

    You can go through the Kafka source code to get complete information.

    https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java

    Example

    I tried with kafka-clients-0.8.2.0 but this functionality is not available that version. so you need to check also Kafka's client version also.

    The example given below is using Kafka Client - 2.1.0.

            ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
            Headers headers = record.headers();
            headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
            headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
            producer.send(record);