Search code examples
springspring-bootspring-kafka

Can we use single spring kafka producer, consumer bean for different topics (same serializers)


I am using spring boot spring-kafka library. So I need to send and consume data over kafka over multiple topics. So, lets say I have two features feature1, feature2, currently this is the producer and consumer config I have:

@Bean
public Map<String, Object> featureProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return config;
}

// feature one producer bean

@Bean
public KafkaTemplate<Long, FeatureOneObject> featureOneKafkaTemplate() {
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(featureProducerFactory()));
}

// feature two producer bean

@Bean
public KafkaTemplate<Long, FeatureTwoObject> featureTwoKafkaTemplate() {
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(featureProducerFactory()));
}

@Bean
public Map<String, Object> featureConsumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return props;
}

// feature one consumer bean config

@Bean
public ConsumerFactory<Long, FeatureOneObject> featureOneConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new LongDeserializer(),
            FeatureOneObject.class);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, FeatureOneObject> featureOneListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Long, FeatureOneObject> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(featureOneConsumerFactory());

    return factory;
}

// feature two consumer bean config

@Bean
public ConsumerFactory<Long, FeatureTwoObject> featureTwoConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new LongDeserializer(),
            FeatureTwoObject.class);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, FeatureTwoObject> featureTwoListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Long, FeatureTwoObject> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(featureTwoConsumerFactory());

    return factory;
}

This way I unnecessarily have to create same type of bean everytime. I was trying to reduce that effort..

Instead of having many beans get created like above, one producer and one consumer for EACH feature, I wanted to check if we can use a single bean for consumer and producer to send and consume data, given that it works for the same serializers. (for key, LongSerializer, for value JsonSerializer)

I was trying out something like below:

// Producer bean

@Bean
public KafkaTemplate<Long, Object> producerKafkaTemplate() {
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(featureProducerFactory()));
}

// CONSUMER bean

@Bean
public ConsumerFactory<Long, Object> featureConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new LongDeserializer(),
            Object.class);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, Object> featureListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Long, Object> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(featureConsumerFactory());

    return factory;
}

But getting following error from the same (error thrown uses different class name than what I mentioned in the example above, though its the same structure):

Listener method could not be invoked with the incoming messageEndpoint handler details:Method [public void com.nsltxn.kafka.FilterRelatedCUDCDKafkaEventHandler.handleFilterRelatedCUDCDRequests(com.nsltxn.dto.FilterRelatedCUNotification)]Bean [com.nsltxn.kafka.FilterRelatedCUDCDKafkaEventHandler@5fee09f9]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.nsltxn.dto.FilterRelatedCUNotification] for GenericMessage ......`

Can someone help me with this? Whether this way of doing is even possible, if yes then what mistake I might be doing... Thanks!


Solution

  • The template is fine, but on the Consumer side, use a JsonMessageConverter with a ByteArrayDeserializer instead of a JsonDeserializer.

    That way, the converter can detect the type to convert to from the method signature.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#messaging-message-conversion

    You can continue to use the JsonSerializer on the producer side.