Here is the Java code for producing data in Kafka:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ExampleClass {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "confbroker:9092";
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static void runProducer() throws Exception {
final Producer<Long, String> producer = createProducer();
long sensorId = 1001L;
try {
for (long index = sensorId; index < sensorId + 5; index++) {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "This is sensor no: " + index);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)\n", record.key(),
record.value(), metadata.partition(), metadata.offset());
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String... args) throws Exception {
runProducer();
}
}
When running console consumer in Confluent 5.4.0, I am getting result as:
The key is gibberish.
How can I produce Key of either Int or Long type.
PS:
=> Same result in Confluent 5.5 also.
=> Same result with IntegerSerializer.
The console consumer uses StringDeserialisers as default for the key and the value. If you want to deserialise the key as Long
you have to explicitly mention that in your console-consumer command:
--property key.deserializer org.apache.kafka.common.serialization.LongDeserializer