Search code examples
javaspring-bootapache-kafkaapache-kafka-streams

KafkaStreams custom SerDes defined in withValueSerde is ignored


I am using Kafka Streams 2.6.0 (in Spring Boot) and have run into really weird issue. I am trying to perform stateful operation (grouping and aggregation) on stream:

        freeTextSignPartialUpdateStream
                .groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
                .aggregate(
                        ArrayList::new,
                        freeTextSignUtils::updateFreeTextSignUpdateList,
                        Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
                                .withKeySerde(Serdes.Long())
                                .withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
                                .withCachingDisabled()
                                .withLoggingDisabled()
                )
                .toStream()
                .to(
                        storesService.getFreeTextSignUpdatesStoreTopicName(),
                        Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
                );

FTS_PARTIAL_UPDATE_LIST is a proper Serdes implementation defined as constant (similar to FTS_PARTIAL_UPDATE_MSG_SERDE which is working without any issues).

The weird thing is not that I am getting ser/des errors, but the value defined in withValueSerde is completely ignored and instead StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG is used (which is currently set to JsonSerdes).

I am using similar pattern as above in other stream processing parts of the application without any issues. Also, when I replace the default JsonSerdes with my FTS_PARTIAL_UPDATE_LIST this works.

I have checked withValueSerde documentation where it says that it will fallback to default Serdes when input value is null, which is obviously not (I've checked that in debuger).


Solution

  • So actually, it was my mistake, since the deserializer in SerDes implementation was returning null.