Does anyone know how to configure spring.json.trusted.packages
for Kafka Streams? I'm using the following config:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
return new KafkaStreamsConfiguration(Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
));
}
My application.yml is the following:
spring:
kafka:
streams:
bootstrap-servers: XXX.XXX.XXX.XXX:9092
application-id: spring-kafka-test
properties:
processing.guarantee: exactly_once
spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
But when I change the package of the entity published into Kafka, I'm receiving the following exception:
Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 9 more
Looks like the property added to the appplication.yml does not work.
At the moment I found a workaround. It is a bit hacky, but I haven't found anything more elegant and at least it works now.
I've modified my applicationi.yml
and added consumer
and producer
sections with the following configuration:
spring:
kafka:
consumer:
properties:
spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
producer:
properties:
spring.json.add.type.headers: false
streams:
bootstrap-servers: XXX.XXX.XXX.XXX:9092
application-id: spring-kafka-test
properties:
processing.guarantee: exactly_once
Then I manually added configured properties to producer and consumer:
@Bean
public JsonSerde<Entity> entityJsonSerde(
ObjectMapper objectMapper,
KafkaProperties kafkaProperties) {
JsonSerde<FlatTransaction> serde = new JsonSerde<>(Entity.class, objectMapper);
serde.deserializer().configure(kafkaProperties.buildConsumerProperties(), false);
serde.serializer().configure(kafkaProperties.buildProducerProperties(), false);
return serde;
}
Now all configurations from configured producer
and consumer
sections are applied to my serde object. The exception is gone and app works as expected.