I set properties via constructor(partly), have tried with setProperties, put, ProducerConfig, and text values, the same result
public class KafkaProducer {
private <T> void produce( T data, String topic)
{
Gson gson = new GsonBuilder()
.setPrettyPrinting()
.registerTypeAdapter(LocalDate.class, new LocalDateAdapter())
.create();
String jsonString = gson.toJson(data);
Properties kafkaProperties = new Properties();
try(Producer<String, String> producer = new KafkaProducer<>(kafkaProperties))
{
kafkaProperties.setProperty(CLIENT_ID_CONFIG, MainProperties.get().kafkaProducerProperties.getClientId());
kafkaProperties.setProperty(BOOTSTRAP_SERVERS_CONFIG, MainProperties.get().kafkaProducerProperties.getUrl());
kafkaProperties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProperties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
createTopic(topic, kafkaProperties);
producer.send(new ProducerRecord<>(topic, jsonString));
}
public void produceDataType1(KafkaType1Message kafkaType1Values)
{
produce(kafkaType1Values, MainProperties.get().kafkaProducerProperties.getType1Topic());
}
public void produceDataType2(KafkaType2Message kafkaType2Values)
{
produce(kafkaDailyDynamicValues, MainProperties.get().kafkaProducerProperties.getType2ValuesTopic());
}
public KafkaProducerProperties(Source source)
{
super(source);
this.url = value("url","");
this.clientId = value("clientId", "TestProducer");
this.type1ValuesTopic = value("type1", "type1_topic");
this.type2ValuesTopic = value("type2", "type1_topic");
}
public static Factory<KafkaProducerProperties> factory()
{
return KafkaProducerProperties::new;
}
org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:553)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
}}
You've created the producer in the try header, using an empty properties object.
There's no exceptions that setProperty method will throw, so those don't need to be in the try body
So,
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(CLIENT_ID_CONFIG, MainProperties.get().kafkaProducerProperties.getClientId());
kafkaProperties.setProperty(BOOTSTRAP_SERVERS_CONFIG, MainProperties.get().kafkaProducerProperties.getUrl());
try {
// should only be done once, not every produce call
createTopic(topic, kafkaProperties);
}
kafkaProperties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProperties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try(Producer<String, String> producer = new KafkaProducer<>(kafkaProperties)) {
producer.send(new ProducerRecord<>(topic, jsonString));
}
You'll also want to add a catch to know if producing to the topic succeeds
Also, you don't need producer configs to create a topic, and Kafka includes Jackson and its own JSONSerializer class, so you wouldn't need Gson