I am learning how to use Spring for Apache Kafka 3.3.1. I am following the official documentation here, here and here to set JSON value serializer for my KafkaTemplate
.
Unfortunately in my case initializing the ProducerFactory<>
using the Map<String,Object> configs
constructor does not set the serializer and my template uses the default serializers. Only the constructor that sets the config-map + key and value serializers works properly.
It seems that I really missed something, but I can not see what. I would like to use the simple constructor with one Map parameter as it is written in the doc:
return new DefaultKafkaProducerFactory<>(producerConfigs())
My Kafka configuration:
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.producer.bootstrap.servers:kafka-1.hello.com:9092, kafka-2.hello.com:9092}")
private String kafkaBootstrapServers;
@Value("${kafka.producer.enable.idempotence:true}")
private String kafkaProducerEnableIdempotence;
@Value("${kafka.producer.acks:all}")
private String kafkaProducerAcks;
@Value("${kafka.producer.retries:2147483647}")
private String kafkaProducerRetries;
@Value("${kafka.producer.linger.ms:0}")
private String kafkaProducerLingerMs;
@Value("${kafka.producer.delivery.timeout.ms:120000}")
private String kafkaProducerDeliveryTimeoutMs;
@Value("${kafka.producer.request.timeout.ms:30000}")
private String kafkaProducerRequestTimeoutMs;
@Value("${kafka.producer.retry.backoff.ms:100}")
private String kafkaProducerRetryBackoffMs;
@Value("${kafka.producer.retry.backoff.max.ms:1000}")
private String kafkaProducerRetryBackoffMaxMs;
@Value("${kafka.topic.name:topic1}")
private String kafkaTopicName;
@Value("${kafka.topic.partitions:1}")
private int kafkaTopicPartitions;
@Value("${kafka.topic.replicas:1}")
private int kafkaTopicReplicas;
@Bean
public ProducerFactory<String, Event> producerFactory() {
// set the key and value serializer this way does not work
// KafkaTemplate uses the default serializers
//DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(producerConfiguration());
// this sets properly the serializers
DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(
producerConfiguration(),
new StringSerializer(),
new JsonDeserializer<>(Event.class));
factory.setProducerPerThread(true);
return factory;
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate() {
var factory = producerFactory();
log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
return new KafkaTemplate<>(factory);
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic() {
log.debug(
"creating a new kafka topic: \"{name: \"{}\", partitions: {}, replicas: {}}\"",
kafkaTopicName,
kafkaTopicPartitions,
kafkaTopicReplicas);
return TopicBuilder.name(kafkaTopicName)
.partitions(kafkaTopicPartitions)
.replicas(kafkaTopicReplicas)
.build();
}
private Map<String, Object> producerConfiguration() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerEnableIdempotence);
configs.put(ProducerConfig.ACKS_CONFIG, kafkaProducerAcks);
configs.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetries);
configs.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerLingerMs);
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeoutMs);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeoutMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerRetryBackoffMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, kafkaProducerRetryBackoffMaxMs);
return configs;
}
private String factoryConfigurationToString(ProducerFactory<String, Event> producerFactory) {
var keySerializer = producerFactory.getKeySerializer();
var keySerializerAsString = Objects.isNull(keySerializer) ? "null" : keySerializer.getClass().getName();
var valueSerializer = producerFactory.getValueSerializer();
var valueSerializerAsString = Objects.isNull(valueSerializer) ? "null" : valueSerializer.getClass().getName();
var sb = new StringBuilder().append("configuration: ").append("{");
producerFactory.
getConfigurationProperties().
forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\", ", key, value)));
sb.setLength(sb.length() - 2);
sb.append("}, ");
sb.append("key-serializer: ").append(keySerializerAsString).append(", ");
sb.append("value-serializer: ").append(valueSerializerAsString);
return sb.toString();
}
}
I always check the result using a Topic browser as well. In the 1st case I can see binary content on the topic, not JSON.
Log using the constructor with only Map:
DEBUG 200 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : initializing a KafkaTemplate using the following setting:
{
"configuration":{
"retries":"5",
"enable.idempotence":"true",
"retry.backoff.max.ms":"1000",
"value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
"request.timeout.ms":"30000",
"acks":"all",
"bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
"delivery.timeout.ms":"120000",
"retry.backoff.ms":"100",
"key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
"linger.ms":"0"
},
"key-serializer":null, <-- WRONG
"value-serializer":null <-- WRONG
}
DEBUG 200 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : creating a new kafka topic:
{
"name":"incoming",
"partitions":2,
"replicas":2
}
Log using the constructor with three parameters:
DEBUG 201 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration
: initializing a KafkaTemplate using the following setting:
{
"configuration":{
"retries":"5",
"enable.idempotence":"true",
"retry.backoff.max.ms":"1000",
"value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
"request.timeout.ms":"30000",
"acks":"all",
"bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
"delivery.timeout.ms":"120000",
"retry.backoff.ms":"100",
"key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
"linger.ms":"0"
},
"key-serializer":"org.apache.kafka.common.serialization.StringSerializer", <-- OK
"value-serializer":"org.springframework.kafka.support.serializer.JsonSerializer" <-- OK
}
DEBUG 201 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : creating a new kafka topic:
{
"name":"incoming",
"partitions":2,
"replicas":2
}
The problem was related to my kafka-consumer
configuration. My kafka-producer
configuration was correct.
If you want to provide type info for the JsonDeserializer
then you need to use the constructor of the ConsumerFactory
:
@Bean
public ConsumerFactory<String, Event> consumerConfigs() {
return new DefaultKafkaConsumerFactory<>(
consumerConfiguration(),
new StringDeserializer(),
new JsonDeserializer<>(Event.class)); // use typed serializer
}
If you do not need to provide type info, then you can use the default constructor of the the JsonDeserializer
with the Map
config.
@Bean
public ConsumerFactory<String, Event> consumerConfigs() {
return new DefaultKafkaConsumerFactory<>(consumerConfiguration());
}
private Map<String, Object> consumerConfiguration() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
...
return configs;
}