Search code examples
javaspringspring-bootapache-kafkaspring-kafka

How to make generic kafka consumer factory?


Right now I'm trying to make my two microservices communicate with each other. The first one is sending message like this:

@PostMapping("/testPost")
    public ResponseEntity<?> testPost(@RequestBody UserCreatedEvent userCreatedEvent) {
        try {
            ProducerRecord<String, Object> producerRecord =
                    new ProducerRecord<>("user", "user-created", userCreatedEvent);
            producerRecord.headers().add("spring.kafka.serialization.selector", "userCreated".getBytes());

            SendResult<String, Object> result =
                    kafkaTemplate.send(producerRecord).get();

            return ResponseEntity.ok(string);
        } catch (Exception e) {
            return ResponseEntity.internalServerError().body("Broker error!");
        }
    }

Here is producer config:

@Configuration
public class ProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String serverAddress;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                serverAddress);
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG,
                "all");
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
                true);
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG,
                5);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

So when my consumer microservice recieves the message, I'm getting a lot of exceptions, caused by:

Caused by: java.lang.IllegalArgumentException: The class 'org.example.testsender.dto.UserCreatedEvent' is not in the trusted packages: [java.util, java.lang, org.example.userservice.event, org.example.userservice.event.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Also, for some reason, my consumer can't see ErrorHandlingDeserializer:

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer

Here is my consumer config:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory(DelegatingDeserializer delegatingDeserializer) {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                groupId);
        props.put(
                JsonDeserializer.TRUSTED_PACKAGES,
                "*"
        );

        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class);
        props.put(
                ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
                StringDeserializer.class
        );

        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class);
        props.put(
                ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
                DelegatingDeserializer.class
        );



        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), delegatingDeserializer);
    }

    @Bean
    public DelegatingDeserializer delegatingDeserializer() {
        Map<String, Deserializer<?>> delegates = new HashMap<>();
        delegates.put("userCreated", new JsonDeserializer<>(UserCreatedEvent.class));

        return new DelegatingDeserializer(delegates);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory,
            DefaultErrorHandler errorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }

    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
        DefaultErrorHandler errorHandler =
                new DefaultErrorHandler(
                        new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(3000, 3));
        errorHandler.addRetryableExceptions(ServiceUnavailable.class);
        errorHandler.addNotRetryableExceptions(
                ServerSideException.class,
                ClientSideException.class,
                DeserializationException.class,
                JsonProcessingException.class,
                MismatchedInputException.class);

        return errorHandler;
    }
}

So, when I'm setting consumer properties like this:

 props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class);
        props.put(
                ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
                JsonDeserializer.class
        );
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserCreatedEvent.class);

My message is being sent to the dead letter topic. Why? Why it wasnt sent to dlt with previous configuration? And the main question is: how to make generic consumer factory? For now I have only one event type, but later I will have more, should I make consumer factory for each? I dont' think it's good idea. According to the docs, as I understood, there was a possibility to use

DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG

But now there is no such thing anymore, so I replaced it the @Bean of DelegatingDeserializer (you can see it in my consumer configuration), maybe I did it wrong? Which is the right way to make generic consumerfactory which supports multiple event types for one topic? By the way, here is my listnere:

@Component
@Slf4j
public class UserCreatedKafkaEventListener {

    @KafkaHandler(isDefault = true)
    public void defaultHandler(Object o) {
        log.info("Received unknown event: {}", o);
    }

    @KafkaListener(topics = "user", groupId = "users")
    public void userCreatedEventHandler(ConsumerRecord<String, UserCreatedEvent> consumerRecord) {
        String key = consumerRecord.key();
        UserCreatedEvent event = consumerRecord.value();

        log.info("Handling event with key: {}", key);

    }
}

UPDATED I created a github repository with test project to reproduce the problem https://github.com/Denstran/TestKafkaProject

UPDATED 2 I fixed the issue with deserialization by adding this containerFactory = "containerFactory" to @KafkaListener. But now I'm dealing with another problem. I have different classes annotated with @KafkaListener and those classes have @KafkaHandler methods for specific event object types like this

@KafkaListener(topics = "users", groupId = "user")
@Component
@Slf4j
public class UserCreatedKafkaEventListener {
    @KafkaHandler
    public void handleEvent(UserCreatedEvent userCreatedEvent) {
        log.info("Handling user created event: {}, {}",
                userCreatedEvent.getClass().getCanonicalName(), userCreatedEvent);
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        log.info("Handling default object: {}", object);
        log.info("Handling object with class name: {}, object: {}",
                object.getClass().getCanonicalName(), object);
    }
}

So the problem is, that now all events I'm sending via producer are getting to @KafkaHandler(isDefault = true) of UserCreatedKafkaEventListener class, but not to other classes with handlers for their types. Here is updated consumer config and producer config of my new project, link for which I already provided:

@Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent, userUpdated:org.example.userservice.dto.UserUpdatedEvent");
        consumerProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, true);
        consumerProps.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);

        return new DefaultKafkaConsumerFactory<>(consumerProps);
    }
@Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> producerConfig = new HashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        producerConfig.put(JsonSerializer.TYPE_MAPPINGS,
                "userCreated:org.example.producerservice.dto.UserCreatedEvent, userUpdated:org.example.producerservice.dto.UserUpdatedEvent");
        producerConfig.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);

        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
        producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 5);
        return new DefaultKafkaProducerFactory<>(producerConfig);
    }

Solution

  • So, following the words of Atem Bilan, I checked if my consumer factory is out of use in Spring Boot or not and found that it definitely is. How so? Well, as Artem said, Spring Boot provides @Bean of ConcurrentKafkaListenerContainerFactory out of box and, as I googled, if you want to override it, the bean name should be "kafkaListenerContainerFactory" (according to this), otherwise you need to mention which container factory to use in containerFactory parameter of @KafkaListener like this @KafkaListener(topics = "users", groupId = "user", containerFactory = "containerFactory"). Also, to fix deserialization problem, it's important to specify type mappings like this:

    producerConfig.put(JsonSerializer.TYPE_MAPPINGS, "userCreated:org.example.producerservice.dto.UserCreatedEvent");
    consumerConfig.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent");
    

    This allows our consumer to understand to witch type it should deserialize object due to token "userCreated", after which you should specify the class name of your object (read Mapping Types) in spring docs