Search code examples
javaapache-kafkakafka-consumer-apispring-kafka

Error to use @KafkaListener at class level


I'm trying to use @KafkaListener at class level, and use @KafkaHandler with @Payload at the args to validate the message. I need this because the Topic has more than one type of Json. But I'm getting some errors.

First I got:

Caused by: java.lang.IllegalArgumentException: The class 'br.com.producer.chave.model.Chave' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

But br.com.producer.chave.model.Chave is the model in Producer and I'm in consumer, is another project, anyway, I solve this adding at KafkaConfig

configProps.put(JsonDeserializer.TRUSTED_PACKAGES,"*");

But the Deserializer tries to find this class in my consumer, and I got another error:

Caused by: java.lang.ClassNotFoundException: br.com.producer.chave.model.Chave

I know that when Json serializes data in the producer, it adds the type to the header, and I can disable it.

So, I was reading the documentation and it says to add JsonDeserializer.TYPE_MAPPINGS in the producer and in the consumer, and I got the same errors. These are my classes:

Config Producer: obs: I had 5 different types, but in TYPE_MAPPINGS I put only two for test.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.producer.security.protocol}")
    private String sslSecurityProtocol;

    @Value("${kafka.producer.properties.sasl.mechanism}")
    private String saslMechanism;

    @Value("${kafka.producer.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Value("${kafka.value-serializer}")
    private String valueSerializer;

    @Value("${kafka.key-serializer}")
    private String keySerializer;

    @Value("${kafka.client-id}")
    private String clientId;
    @Bean
    public ProducerFactory<String,Object> producerFactory() {
        JsonSerializer serializer = new JsonSerializer();
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(JsonSerializer.TYPE_MAPPINGS,
                        "chave:br.com.producer.chave.model.Chave," +
                        "cpfCnpj:br.com.producer.cpfcnpj.model.CpfCnpj");
        serializer.configure(configProps, false);
        return new DefaultKafkaProducerFactory<>(configProperties(), new StringSerializer(), serializer);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<String, Object>(producerFactory());
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(configProperties());
    }

    private Map<String,Object> configProperties() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, sslSecurityProtocol);
        configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        configProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        return configProps;
    }
}

Config Consumer:

@Configuration
@Slf4j
public class KafkaConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.group-id}")
    private String groupId;

    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.security.protocol}")
    private String sslSecurityProtocol;

    @Value("${kafka.consumer.properties.sasl.mechanism}")
    private String saslMechanism;

    @Value("${kafka.consumer.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Value("kafka.consumer.enable-auto-commit")
    private String autoCommit;

    @Value("${kafka.consumer.ssl.protocol}")
    private String sslProtocol;

    @Value("${kafka.consumer.ssl.trust-store-location}")
    private String sslTruststoreLocation;

    @Value("${kafka.consumer.ssl.trust-store-password}")
    private String sslTruststorePassword;

    @Value("${kafka.consumer.ssl.trust-store-type}")
    private String sslTruststoreType;

    @Value("${kafka.consumer.client-id}")
    private String clientId;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>

    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setSyncCommits(Boolean.TRUE);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer deserializer = new JsonDeserializer();
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(JsonDeserializer.TYPE_MAPPINGS,
                "chave:br.com.consumer.psib.domain.model.Chave," +
                        "cpfCnpj:br.com.consumer.psib.domain.model.CpfCnpj");
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        deserializer.configure(configProps, false);
        return new DefaultKafkaConsumerFactory<>(configProperties(), new StringDeserializer(), deserializer,true);
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(configProperties());
    }


    private Map<String, Object> configProperties() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(autoCommit));

        return configProps;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(configProperties());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

And this is my listener:

@Slf4j
@Component
@KafkaListener(
        topics = "${kafka.consumer.topics.chave-pix-negativa.topic}",
        groupId = "${kafka.consumer.group-id}"
)
public class listener {

    @Autowired
    private IListaNegativaInsertUseCase<Chave> chaveInsertUseCase;
    @Autowired
    private IListaNegativaDeleteUseCase<Chave> chaveDeleteUseCase;

    @Autowired
    private IListaNegativaInsertUseCase<CpfCnpj> cpfCnpjInsertUseCase;

    @Autowired
    private IListaNegativaDeleteUseCase<CpfCnpj> cpfCnpjDeleteUseCase;


    @KafkaHandler
    public void listenerCpfCnpj(@Payload CpfCnpj message, Acknowledgment acknowledgment) {
       //otherthing
    }


    @KafkaHandler
    public void listenerLista(@Payload Chave message, Acknowledgment acknowledgment) {
     //something

    }

    @KafkaHandler(isDefault = true)
    public void defaulthander (@Payload String m){
        log.error("NAO FOI POSSIVEL {}", m);
    }
}

With this config the TYPE_MAPPINGS doesn't work, I don't know if I miss something or I'm using it wrong.

I expect the @KafkaHandler validated the payload and goes to the correct handler according to the type of the message.


Solution

  • But, br.com.producer.chave.model.Chave is the model in Producer and i'm in consumer

    The producer needs to share its built models with the consumer as a JAR. Perhaps via Maven dependency, for example.

    Otherwise, you have to fallback to using Kafka StringDeserializer and handling the class deserialization on your own.

    You would get the exact same error if you tried to manually use objectMapper.readValue(payload, br.com.producer.chave.model.Chave.class), as that class does not exist on your classpath.

    If you've copied/re-defined your own chave class type, then you're needlessly duplicating code, IMO.

    Also trusting * is not recommended. You should try to use java.util,java.lang,br.com.producer,br.com.consumer instead.