Search code examples
javaapache-kafkakafka-producer-api

How to create Custom serializer in kafka?


There is only few serializer available like,

org.apache.kafka.common.serialization.StringSerializer

How can we create our own custom serializer ?


Solution

  • Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.

    We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.

    Serializing MyMessage in producer side.

    You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer

    serialize() method do the work, receiving your object and returning a serialized version as bytes array.

    public class MyValueSerializer implements Serializer<MyMessage>
    {
        private boolean isKey;
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey)
        {
            this.isKey = isKey;
        }
    
        @Override
        public byte[] serialize(String topic, MyMessage message)
        {
            if (message == null) {
                return null;
            }
    
            try {
    
                (serialize your MyMessage object into bytes)
    
                return bytes;
    
            } catch (IOException | RuntimeException e) {
                throw new SerializationException("Error serializing value", e);
            }
        }
    
        @Override
        public void close()
        {
    
        }
    }
    
    final IntegerSerializer keySerializer = new IntegerSerializer();
    final MyValueSerializer myValueSerializer = new MyValueSerializer();
    final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
    
    int messageNo = 1;
    int kafkaKey = messageNo;
    MyMessage kafkaValue = new MyMessage();
    ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
    producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
    

    Deserializing MyMessage in consumer side.

    You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

    deserialize() method do the work, receiving serialized value as bytes array and returning your object.

    public class MyValueDeserializer implements Deserializer<MyMessage>
    {
        private boolean isKey;
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey)
        {
            this.isKey = isKey;
        }
    
        @Override
        public MyMessage deserialize(String s, byte[] value)
        {
            if (value == null) {
                return null;
            }
    
            try {
    
                (deserialize value into your MyMessage object)
    
                MyMessage message = new MyMessage();
                return message;
    
            } catch (IOException | RuntimeException e) {
                throw new SerializationException("Error deserializing value", e);
            }
        }
    
        @Override
        public void close()
        {
    
        }
    }
    

    Then use it like this:

    final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
    final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
    final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
    
    ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
    for (ConsumerRecord<Integer, MyMessage> record : records) {
    
        int kafkaKey = record.key();
        MyMessage kafkaValue = record.value();
    
        ...
    }