Search code examples
spring-bootspring-integrationspring-kafkajackson-databind

InvalidDefinitionException occours if ResolvableType is already in the header


We use spring integration and spring kafka in our project. After the Update from Spring-boot 3.1.7 to 3.2.1 we got an exception during serialization of the messages.

Given the flow:

@Bean
IntegrationFlow sendToKafkaFlow() {
    return IntegrationFlow.from("sendToKafkaFlow.input")
            .transform(Transformers.toJson()) // 1
           // some more handlers/header enricher 
//        .headerFilter(JsonHeaders.RESOLVABLE_TYPE) // 2
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate) //3
                   .topic("someTopic")
            )
            .get();
}

Every time we send a message with this flow, we see the Exception:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Direct self-reference leading to cycle (through reference chain: org.springframework.core.ResolvableType["componentType"]-org.springframework.core.ResolvableType["componentType"])

The transfomer in step (1) will create the json__resolvableType header. Within the outboundChannelAdapter (3) the DefaultKafkaHeaderMapper is used to create the kafka headers. As soon as this mapper will map the resolvableType it will produce the error and will not map this header.

    com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Direct self-reference leading to cycle (through reference chain: org.springframework.core.ResolvableType["componentType"]->org.springframework.core.ResolvableType["componentType"])
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
    at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1308)
(3) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter._handleSelfReference(BeanPropertyWriter.java:948)
(2) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:726)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
(1) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:479)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:318)
    at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4719)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3987)

As far as I could see in the debugger, the componentType is null and will be replaces by an EmptyInstance (1). This EmptyInstance will also be analysed. The EmptyInstance itself will again return an EmptyInstance for the componentType (2). This leads to the self-reference. (3)

If we would remove this header (flow 2) the header is missing and the exception is gone.

I tried to to switch of fail-on-self-refenrences within the application.yml

spring:   
  jackson:
    serialization:
      fail-on-self-references: false

But that is not working. Unfortunately, we have this kind of pattern quite often (in different applications). (transform, do-something, send-to-kafka). For that reason adding a modification of the HeaderMapper or changes the flow, to do the json serialization within the kafka template is some effort.

Is there anything else, we can try to avoid this Direct self-reference leading to cycle Exception for the resolvable type header?


Solution

  • After some test I found out that this is not a new Exception. In past it was "hidden" as debug log. https://github.com/spring-projects/spring-kafka/commit/a57c319f78438ace4b0be3e89ed3398638e47abd#diff-0b0eca71e2fe3c998d8f8c9fb488081a6b4ef2c62190f2e55b38278e84a1b8dcL297

    Since we do not need the RESOLVABLE_TYPE header. We created a DefaultKafkaHeaderMapper ignore these Header.

        @Bean
        @ConditionalOnMissingBean
        public DefaultKafkaHeaderMapper kafkaHeaderMapper(ObjectMapper mapper) {
            return new DefaultKafkaHeaderMapper(mapper, "!" + MessageHeaders.ID,
                "!" + MessageHeaders.TIMESTAMP,
                "!" + JsonHeaders.RESOLVABLE_TYPE,
                "*");
        }
    

    and inject it to the outboundChannelAdapter

        @Bean
        IntegrationFlow sendToKafkaFlow() {
            return IntegrationFlow.from("sendToKafkaFlow.input")
                .transform(Transformers.toJson()) 
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate) 
                       .topic("someTopic")
                       .headerMapper(defaultKafkaHeaderMapper)
                )
                .get();
        }
    

    BUT this solves not the general issue: ResolvableType can not be serialized as json with spring BeanSerializer in all cases.