I'm trying to use @KafkaListener
at class level, and use @KafkaHandler
with @Payload
at the args to validate the message. I need this because the Topic has more than one type of Json
. But I'm getting some errors.
First I got:
Caused by: java.lang.IllegalArgumentException: The class 'br.com.producer.chave.model.Chave' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
But br.com.producer.chave.model.Chave
is the model in Producer
and I'm in consumer, is another project, anyway, I solve this adding at KafkaConfig
configProps.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
But the Deserializer
tries to find this class in my consumer, and I got another error:
Caused by: java.lang.ClassNotFoundException: br.com.producer.chave.model.Chave
I know that when Json serializes data in the producer, it adds the type to the header, and I can disable it.
So, I was reading the documentation and it says to add JsonDeserializer.TYPE_MAPPINGS
in the producer and in the consumer, and I got the same errors. These are my classes:
Config Producer:
obs: I had 5 different types, but in TYPE_MAPPINGS
I put only two for test.
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.security.protocol}")
private String sslSecurityProtocol;
@Value("${kafka.producer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${kafka.producer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${kafka.value-serializer}")
private String valueSerializer;
@Value("${kafka.key-serializer}")
private String keySerializer;
@Value("${kafka.client-id}")
private String clientId;
@Bean
public ProducerFactory<String,Object> producerFactory() {
JsonSerializer serializer = new JsonSerializer();
Map<String, Object> configProps = new HashMap<>();
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"chave:br.com.producer.chave.model.Chave," +
"cpfCnpj:br.com.producer.cpfcnpj.model.CpfCnpj");
serializer.configure(configProps, false);
return new DefaultKafkaProducerFactory<>(configProperties(), new StringSerializer(), serializer);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory());
}
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(configProperties());
}
private Map<String,Object> configProperties() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, sslSecurityProtocol);
configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
return configProps;
}
}
Config Consumer:
@Configuration
@Slf4j
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.security.protocol}")
private String sslSecurityProtocol;
@Value("${kafka.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${kafka.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("kafka.consumer.enable-auto-commit")
private String autoCommit;
@Value("${kafka.consumer.ssl.protocol}")
private String sslProtocol;
@Value("${kafka.consumer.ssl.trust-store-location}")
private String sslTruststoreLocation;
@Value("${kafka.consumer.ssl.trust-store-password}")
private String sslTruststorePassword;
@Value("${kafka.consumer.ssl.trust-store-type}")
private String sslTruststoreType;
@Value("${kafka.consumer.client-id}")
private String clientId;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(Boolean.TRUE);
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
JsonDeserializer deserializer = new JsonDeserializer();
Map<String, Object> configProps = new HashMap<>();
configProps.put(JsonDeserializer.TYPE_MAPPINGS,
"chave:br.com.consumer.psib.domain.model.Chave," +
"cpfCnpj:br.com.consumer.psib.domain.model.CpfCnpj");
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
deserializer.configure(configProps, false);
return new DefaultKafkaConsumerFactory<>(configProperties(), new StringDeserializer(), deserializer,true);
}
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(configProperties());
}
private Map<String, Object> configProperties() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(autoCommit));
return configProps;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(configProperties());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
And this is my listener:
@Slf4j
@Component
@KafkaListener(
topics = "${kafka.consumer.topics.chave-pix-negativa.topic}",
groupId = "${kafka.consumer.group-id}"
)
public class listener {
@Autowired
private IListaNegativaInsertUseCase<Chave> chaveInsertUseCase;
@Autowired
private IListaNegativaDeleteUseCase<Chave> chaveDeleteUseCase;
@Autowired
private IListaNegativaInsertUseCase<CpfCnpj> cpfCnpjInsertUseCase;
@Autowired
private IListaNegativaDeleteUseCase<CpfCnpj> cpfCnpjDeleteUseCase;
@KafkaHandler
public void listenerCpfCnpj(@Payload CpfCnpj message, Acknowledgment acknowledgment) {
//otherthing
}
@KafkaHandler
public void listenerLista(@Payload Chave message, Acknowledgment acknowledgment) {
//something
}
@KafkaHandler(isDefault = true)
public void defaulthander (@Payload String m){
log.error("NAO FOI POSSIVEL {}", m);
}
}
With this config the TYPE_MAPPINGS
doesn't work, I don't know if I miss something or I'm using it wrong.
I expect the @KafkaHandler validated the payload and goes to the correct handler according to the type of the message.
But, br.com.producer.chave.model.Chave is the model in Producer and i'm in consumer
The producer needs to share its built models with the consumer as a JAR. Perhaps via Maven dependency, for example.
Otherwise, you have to fallback to using Kafka StringDeserializer and handling the class deserialization on your own.
You would get the exact same error if you tried to manually use objectMapper.readValue(payload, br.com.producer.chave.model.Chave.class)
, as that class does not exist on your classpath.
If you've copied/re-defined your own chave
class type, then you're needlessly duplicating code, IMO.
Also trusting *
is not recommended. You should try to use java.util,java.lang,br.com.producer,br.com.consumer
instead.