I am sending a message to Kafka as follows:
private KafkaTemplate<String, MyMessage > kafkaTemplate;
public void sendMessage(MyMessage data) {
Message<MyMessage > message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC,topic)
.setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
.build();
kafkaTemplate.send(message);
I have my own custom de/serializer for MyMessage, The key is a simple String
I am trying to process the message as follows:
@StreamListener
public void process(@Input("input") KStream<String,MyMessage> myStream){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KStream<?, ?> input();
}
This is failing with a deserialization error and after debugging I see the reason is that the content-type header of the message is set to application/json and therefore it is trying to deserialize the message as JSON. How does the content-type header get set and how can I override it? I am not sending the messages as JSON but rather as a byte[] like regular Kafka messages so I want my custom de/serializer to be used
What is even stranger is that if I change this to a KTable it works fine.
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
KStream<String,MyMessage> myStream = myTable.toStream();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KTable<?, ?> input();
}
Why with a KTable is the message deserialized correctly? What is the difference in the behavior between a KStream and a KTable?
I got this to work by doing the following:
set the input content type header in my application.yml file as follows:
spring.cloud.stream.bindings.input.content-type: application/mymessage
Create a CustomMessageConverter which extends AbstractMessageConverter to convert from a byte[] to MyMessage. It also defines the new MimeType
public MyMappingConverter() {
super(new MimeType("application", "mymessage"));
}
@Bean
@StreamMessageConverter
public MessageConverter myMessageConverter() {
return new MyMessageConverter();
}
I still have no idea why I had to do all of this for a KStream but not for a KTable, if anyone can explain that it would be appreciated.