Search code examples
javaapache-kafkapropertieskafka-producer-api

Kafka producer throws "key.serializer" exception


I set properties via constructor(partly), have tried with setProperties, put, ProducerConfig, and text values, the same result

public class KafkaProducer { 

private <T> void produce( T data, String topic) 
    {
        
        Gson gson = new GsonBuilder()
                .setPrettyPrinting()
                .registerTypeAdapter(LocalDate.class, new LocalDateAdapter())
                .create();
        String jsonString = gson.toJson(data);

        Properties kafkaProperties = new Properties();
        try(Producer<String, String> producer = new KafkaProducer<>(kafkaProperties))
        {
            kafkaProperties.setProperty(CLIENT_ID_CONFIG, MainProperties.get().kafkaProducerProperties.getClientId());
            kafkaProperties.setProperty(BOOTSTRAP_SERVERS_CONFIG, MainProperties.get().kafkaProducerProperties.getUrl());
            kafkaProperties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            kafkaProperties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            createTopic(topic, kafkaProperties);
            producer.send(new ProducerRecord<>(topic, jsonString));
        }


    public  void produceDataType1(KafkaType1Message kafkaType1Values)
    {
        produce(kafkaType1Values, MainProperties.get().kafkaProducerProperties.getType1Topic());
    }
    
    public  void produceDataType2(KafkaType2Message kafkaType2Values) 
    {
        produce(kafkaDailyDynamicValues, MainProperties.get().kafkaProducerProperties.getType2ValuesTopic());
    }




    public KafkaProducerProperties(Source source) 
    {
        super(source);
        this.url = value("url","");
        this.clientId = value("clientId", "TestProducer");
        this.type1ValuesTopic = value("type1", "type1_topic");
        this.type2ValuesTopic = value("type2", "type1_topic");
    }
    
    public static Factory<KafkaProducerProperties> factory()
    {
        return KafkaProducerProperties::new;
    }


org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
    at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:553)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
}}

Solution

  • You've created the producer in the try header, using an empty properties object.

    There's no exceptions that setProperty method will throw, so those don't need to be in the try body

    So,

    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(CLIENT_ID_CONFIG, MainProperties.get().kafkaProducerProperties.getClientId());
    kafkaProperties.setProperty(BOOTSTRAP_SERVERS_CONFIG, MainProperties.get().kafkaProducerProperties.getUrl());
    try {
        // should only be done once, not every produce call 
        createTopic(topic, kafkaProperties);  
    } 
    
    kafkaProperties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    kafkaProperties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    try(Producer<String, String> producer = new KafkaProducer<>(kafkaProperties)) {
        producer.send(new ProducerRecord<>(topic, jsonString));
    } 
    

    You'll also want to add a catch to know if producing to the topic succeeds

    Also, you don't need producer configs to create a topic, and Kafka includes Jackson and its own JSONSerializer class, so you wouldn't need Gson