Search code examples
javaspring-bootapache-kafkaapache-kafka-streams

How to configure `spring.json.trusted.packages` for Kafka Streams + Spring?


Does anyone know how to configure spring.json.trusted.packages for Kafka Streams? I'm using the following config:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
        return new KafkaStreamsConfiguration(Map.of(
                StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
                StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
                StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
        ));
    }

My application.yml is the following:

spring:
  kafka:
    streams:
      bootstrap-servers: XXX.XXX.XXX.XXX:9092
      application-id: spring-kafka-test
      properties:
        processing.guarantee: exactly_once
        spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*

But when I change the package of the entity published into Kafka, I'm receiving the following exception:

Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 9 more

Looks like the property added to the appplication.yml does not work.


Solution

  • At the moment I found a workaround. It is a bit hacky, but I haven't found anything more elegant and at least it works now.

    I've modified my applicationi.yml and added consumer and producer sections with the following configuration:

    spring:
      kafka:
        consumer:
          properties:
            spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
        producer:
          properties:
            spring.json.add.type.headers: false
        streams:
          bootstrap-servers: XXX.XXX.XXX.XXX:9092
          application-id: spring-kafka-test
          properties:
            processing.guarantee: exactly_once
    

    Then I manually added configured properties to producer and consumer:

        @Bean
        public JsonSerde<Entity> entityJsonSerde(
                ObjectMapper objectMapper,
                KafkaProperties kafkaProperties) {
    
            JsonSerde<FlatTransaction> serde = new JsonSerde<>(Entity.class, objectMapper);
            serde.deserializer().configure(kafkaProperties.buildConsumerProperties(), false);
            serde.serializer().configure(kafkaProperties.buildProducerProperties(), false);
            return serde;
        }
    

    Now all configurations from configured producer and consumer sections are applied to my serde object. The exception is gone and app works as expected.