Search code examples

Spring kafka message deserialization fails because the content-type is set to application/json

I am sending a message to Kafka as follows:

private KafkaTemplate<String, MyMessage > kafkaTemplate;

public void sendMessage(MyMessage data) {

    Message<MyMessage > message = MessageBuilder

I have my own custom de/serializer for MyMessage, The key is a simple String

I am trying to process the message as follows:

    public void process(@Input("input") KStream<String,MyMessage> myStream){
            final Serde<String> stringSerde = Serdes.String();
            final MyMessageSerde myMessageSerde = new MyMessageSerde();
                    .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                            (newKey,val,agg) -> {
                                return agg;
                            Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                    .withValueSerde(new ArrayListSerde(myMessageSerde)));

    interface MyStreamProcessor {
        KStream<?, ?> input();

This is failing with a deserialization error and after debugging I see the reason is that the content-type header of the message is set to application/json and therefore it is trying to deserialize the message as JSON. How does the content-type header get set and how can I override it? I am not sending the messages as JSON but rather as a byte[] like regular Kafka messages so I want my custom de/serializer to be used

What is even stranger is that if I change this to a KTable it works fine.

public void process(@Input("input") KTable<String,MyMessage> myTable){
        final Serde<String> stringSerde = Serdes.String();
        final MyMessageSerde myMessageSerde = new MyMessageSerde();
        KStream<String,MyMessage> myStream = myTable.toStream();
                .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                        (newKey,val,agg) -> {
                            return agg;
                        Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                .withValueSerde(new ArrayListSerde(myMessageSerde)));

interface MyStreamProcessor {
    KTable<?, ?> input();

Why with a KTable is the message deserialized correctly? What is the difference in the behavior between a KStream and a KTable?


  • I got this to work by doing the following:

    1. set the input content type header in my application.yml file as follows: application/mymessage

    2. Create a CustomMessageConverter which extends AbstractMessageConverter to convert from a byte[] to MyMessage. It also defines the new MimeType

    public MyMappingConverter() { super(new MimeType("application", "mymessage")); }

    1. Create a MessageConverter bean

    @Bean @StreamMessageConverter public MessageConverter myMessageConverter() { return new MyMessageConverter(); }

    I still have no idea why I had to do all of this for a KStream but not for a KTable, if anyone can explain that it would be appreciated.