Search code examples
javaspringspring-bootapache-kafkaspring-kafka

spring Kafka model is not in the trusted packages


I'm working on the micro services with spring-Kafka-2.1.5 and spring-boot-2.0.5

First service will produce some messages to kafka and second one will consume them, while consuming I'm having the issue

Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the trusted packages: [java.util, java.lang, com.service2.model.ConsumeMessage]. 
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

So from the error message this is the com.service1.model.TopicMessage serialized model of service1. But I'm trying to Deserialize message into model com.service2.model.ConsumeMessage which is in service2 and having this issue

I found the same question here, and tried below format and also from documentation docs

Below is my configs

  @Bean(name = "kafkaConsumerConfig")
   public Map<String, Object> kafkaConsumerConfig() {

    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetconfig);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

     }

kafkaConsumerFactory

@Bean(name = "kafkaConsumerFactory")
public ConsumerFactory<String, ConsumeMessage> kafkaConsumerFactory()        {

    JsonDeserializer<ConsumeMessage> 
    deserializer = new JsonDeserializer<>();
    deserializer.addTrustedPackages("com.service2.model");
return new DefaultKafkaConsumerFactory<String, ConsumeMessage>(kafkaConsumerConfig(),new StringDeserializer(),deserializer);

}

kafkaListenerContainerFactory

 @Bean(name = "kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AckMode.BATCH);

    return factory;
}

Solution

  • You need to disable header precedence in the deserializer:

    /**
     * Construct an instance with the provided target type, and
     * useHeadersIfPresent with a default {@link ObjectMapper}.
     * @param targetType the target type.
     * @param useHeadersIfPresent true to use headers if present and fall back to target
     * type if not.
     * @since 2.2
     */
    public JsonDeserializer(Class<? super T> targetType, boolean useHeadersIfPresent) {
    

    The useHeadersIfPresent argument must be configured to false. That way an inferred type is going to be used and the header value is going to be ignored.

    If you don't use spring-kafka-2.2, you should consider to implement your own JsonDeserializer with similar logic: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java