Search code examples
javaserializationapache-kafkakafka-producer-api

How to send Custom Object to Kafka Topic with Producer


I would like to send my Account class with Producer to my Kafka Topic, then I will aggregate with Kafka Stream. However, I can not send Object I am getting error :

Caused by: org.apache.kafka.common.KafkaException: bank.Account is not an instance of org.apache.kafka.common.serialization.Serializer

My Producer class:

 public static void main(String[] args) {

        DataAccess dataAccess = new DataAccess();
        List<Account> accountList = dataAccess.read();

        final Logger logger = LoggerFactory.getLogger(Producer.class);
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,LongSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Account.class.getName());


        KafkaProducer<Long,Account> producer = new KafkaProducer<Long, Account>(properties);



        for (Account account : accountList) {

            ProducerRecord<Long,Account> record = new ProducerRecord<Long, Account>("bank_account",account.getFromId(),account);


            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        logger.info("Record sent successfully. \n "+ "Topic : "+recordMetadata.topic() +"\n"+
                                "Partition : " + recordMetadata.partition() + "\n"+
                                "Offset : " +recordMetadata.offset() +"\n"+
                                "Timestamp: " +recordMetadata.timestamp() +"\n");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }

                    }
                    else{
                        logger.info("Error sending producer");
                    }
                }
            });
        }


        producer.flush();
        producer.close();
    }

It gives error in this line :

KafkaProducer<Long,Account> producer = new KafkaProducer<Long, Account>(properties);

My Account class:

public class Account {

    private long fromId;
    private long amount;
    private long toId;
    private ZonedDateTime time;
}

So my question is, how can we send custom object to kafka topics? After that I want to consume that message of course.


Solution

  • This line

    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Account.class.getName());

    You must implement your own Serializer class. It cannot be a plain class.


    Some people use JSON to serialize, others use Avro or Protobuf. However you get the data into a byte[] is just an implementation detail.