I am trying to produce a record to Kafka without passing any partition value but want to send a header and I have the below constructor methods to produce record to Kafka:
ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value, java.lang.Iterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value, java.lang.Iterable<Header> headers)
Creates a record to be sent to a specified topic and partition
ProducerRecord(java.lang.String topic, K key, V value)
Create a record to be sent to Kafka
ProducerRecord(java.lang.String topic, V value)
Create a record with no key
In all the above methods, there is no way I can send a header without sending a partition value, and if I set partition as null I am getting NullPointerException
.
Can you please advice me how to produce record in Kafka by sending a header and not partition value.
Kafka ProducerRecord Instance provides the Headers Instance where we can add key-value pairs.
You can go through the Kafka source code to get complete information.
Example
I tried with kafka-clients-0.8.2.0
but this functionality is not available that version. so you need to check also Kafka's client version also.
The example given below is using Kafka Client - 2.1.0
.
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
Headers headers = record.headers();
headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
producer.send(record);