Search code examples
javaspringspring-bootapache-kafkaspring-kafka

Send supertype as type information in Kafka JSON Serialization


I'm using Spring Boot to send data from one application to the other using Kafka.

My design uses an interface to declare the data being sent:

package domain;

interface Data {
    public String getData();
    public void setData(String data);
}

Producer

In the source app, I implement this interface as a db Entity.

package persistence;

@Data
class DataEntity implements Data {
    private String data;  // lombok generates getter/setters
}

When an entity is added, I want send it as an update to Kafka using the KafkaTemplate

@Component
class DataPublisher implements ApplicationListener<DataEvent> {
    @Autowired private KafkaTemplate<String,Data> template;

    // I left out DataEvent which is a straightforward ApplicationEvent override
    @EventListener(classes = DataEvent.class)
    public void onApplicationEvent(DataEvent event) {
        template.send("data", (Data) event.getSource());
    }
}
// triggered by this call in a service
    eventPublisher.publishEvent(new DataEvent(updatedData));

The serialization is done via the properties

spring:
    kafka:
        consumer:
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            properties.spring.json.value.default.type: domain.Data

Looking at kafkacat output, the data is sent fine.

Consumer

On the receiving side I have

@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
    dataService.updateData(data);
}

which results in

Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]

which I understand perfectly fine - the serializer sends a persistence.DataEntity object, but the client expects a domain.Data object. But that's how the design is supposed to be; I want the client to only know about the domain package, not its persistence implementation. (As a side question, where can I see this type header? It's not in the encoded json AFAICT, what am I missing?)

So the question is: how do I force the Spring JsonDeserializer to send domain.Data as the serialized data type?

I did find a TYPE_MAPPING property in the serializer class, but its only documentation is that it "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'" which doesn't explain anything and I can't find an example usage.

EDIT:

I did add

spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity

to the properties of the producer, but that didn't help.


Solution

  • See the documentation.

    You have to provide mapping on both sides.

    However, instead of using the JsonDeserializer, you should use a BytesDeserializer and a BytesJsonMessageConverter (simply add one as a @Bean and Boot will wire it into the container factory).

    That way, the framework will automatically convert to the parameter type.

    Again, see the documentation.