Search code examples
apache-kafkaspring-kafkaspring-cloud-stream

Spring kafka message deserialization fails because the content-type is set to application/json


I am sending a message to Kafka as follows:

private KafkaTemplate<String, MyMessage > kafkaTemplate;

public void sendMessage(MyMessage data) {

    Message<MyMessage > message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC,topic)
            .setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
            .build();
    kafkaTemplate.send(message);

I have my own custom de/serializer for MyMessage, The key is a simple String

I am trying to process the message as follows:

@StreamListener
    public void process(@Input("input") KStream<String,MyMessage> myStream){
            final Serde<String> stringSerde = Serdes.String();
            final MyMessageSerde myMessageSerde = new MyMessageSerde();
            myStream
                    .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                    .aggregate(ArrayList::new,
                            (newKey,val,agg) -> {
                                agg.add(val);
                                return agg;
                            },
                            Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                    .withKeySerde(stringSerde)
                                    .withValueSerde(new ArrayListSerde(myMessageSerde)));

    ...
    interface MyStreamProcessor {
        @Input("input")
        KStream<?, ?> input();
    }

This is failing with a deserialization error and after debugging I see the reason is that the content-type header of the message is set to application/json and therefore it is trying to deserialize the message as JSON. How does the content-type header get set and how can I override it? I am not sending the messages as JSON but rather as a byte[] like regular Kafka messages so I want my custom de/serializer to be used

What is even stranger is that if I change this to a KTable it works fine.

@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
        final Serde<String> stringSerde = Serdes.String();
        final MyMessageSerde myMessageSerde = new MyMessageSerde();
        KStream<String,MyMessage> myStream = myTable.toStream();
        myStream
                .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                .aggregate(ArrayList::new,
                        (newKey,val,agg) -> {
                            agg.add(val);
                            return agg;
                        },
                        Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                .withKeySerde(stringSerde)
                                .withValueSerde(new ArrayListSerde(myMessageSerde)));

...
interface MyStreamProcessor {
    @Input("input")
    KTable<?, ?> input();
}

Why with a KTable is the message deserialized correctly? What is the difference in the behavior between a KStream and a KTable?


Solution

  • I got this to work by doing the following:

    1. set the input content type header in my application.yml file as follows: spring.cloud.stream.bindings.input.content-type: application/mymessage

    2. Create a CustomMessageConverter which extends AbstractMessageConverter to convert from a byte[] to MyMessage. It also defines the new MimeType

    public MyMappingConverter() { super(new MimeType("application", "mymessage")); }

    1. Create a MessageConverter bean

    @Bean @StreamMessageConverter public MessageConverter myMessageConverter() { return new MyMessageConverter(); }

    I still have no idea why I had to do all of this for a KStream but not for a KTable, if anyone can explain that it would be appreciated.