I am using spring boot spring-kafka library. So I need to send and consume data over kafka over multiple topics. So, lets say I have two features feature1, feature2, currently this is the producer and consumer config I have:
@Bean
public Map<String, Object> featureProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
// feature one producer bean
@Bean
public KafkaTemplate<Long, FeatureOneObject> featureOneKafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(featureProducerFactory()));
}
// feature two producer bean
@Bean
public KafkaTemplate<Long, FeatureTwoObject> featureTwoKafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(featureProducerFactory()));
}
@Bean
public Map<String, Object> featureConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
// feature one consumer bean config
@Bean
public ConsumerFactory<Long, FeatureOneObject> featureOneConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new LongDeserializer(),
FeatureOneObject.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, FeatureOneObject> featureOneListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, FeatureOneObject> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(featureOneConsumerFactory());
return factory;
}
// feature two consumer bean config
@Bean
public ConsumerFactory<Long, FeatureTwoObject> featureTwoConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new LongDeserializer(),
FeatureTwoObject.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, FeatureTwoObject> featureTwoListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, FeatureTwoObject> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(featureTwoConsumerFactory());
return factory;
}
This way I unnecessarily have to create same type of bean everytime. I was trying to reduce that effort..
Instead of having many beans get created like above, one producer and one consumer for EACH feature, I wanted to check if we can use a single bean for consumer and producer to send and consume data, given that it works for the same serializers. (for key, LongSerializer, for value JsonSerializer)
I was trying out something like below:
// Producer bean
@Bean
public KafkaTemplate<Long, Object> producerKafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(featureProducerFactory()));
}
// CONSUMER bean
@Bean
public ConsumerFactory<Long, Object> featureConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new LongDeserializer(),
Object.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, Object> featureListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, Object> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(featureConsumerFactory());
return factory;
}
But getting following error from the same (error thrown uses different class name than what I mentioned in the example above, though its the same structure):
Listener method could not be invoked with the incoming messageEndpoint handler details:Method [public void com.nsltxn.kafka.FilterRelatedCUDCDKafkaEventHandler.handleFilterRelatedCUDCDRequests(com.nsltxn.dto.FilterRelatedCUNotification)]Bean [com.nsltxn.kafka.FilterRelatedCUDCDKafkaEventHandler@5fee09f9]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.nsltxn.dto.FilterRelatedCUNotification] for GenericMessage ......`
Can someone help me with this? Whether this way of doing is even possible, if yes then what mistake I might be doing... Thanks!
The template is fine, but on the Consumer side, use a JsonMessageConverter
with a ByteArrayDeserializer
instead of a JsonDeserializer
.
That way, the converter can detect the type to convert to from the method signature.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#messaging-message-conversion
You can continue to use the JsonSerializer
on the producer side.