Search code examples
javaserializationapache-kafkaavroconfluent-schema-registry

Kafka Avro Serializer and deserializer exception. Avro supported types


I am seeing following error

exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

my kafka producer props are

Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");
    props.put("producer.type", "sync");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);

    Producer<String, TweetInfoDto> producer = new KafkaProducer(props);

and my kafka consumer props are

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "twitterCrawler");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");

    Consumer<String, TweetInfoDto> consumer = new KafkaConsumer(props);

not sure what am I doing wrong.


Solution

  • TweetInfoDto cannot be a plain Java object that you have defined yourself.

    It ideally should be created from an Avro schema via the Avro Maven Plugin, for example.

    Please refer to the Schema Registry Tutorial for all the steps including defining an AVSC, and generating a Java class for it.

    Tutorial sample code here