I am using Kafka Streams 2.6.0 (in Spring Boot) and have run into really weird issue. I am trying to perform stateful operation (grouping and aggregation) on stream:
freeTextSignPartialUpdateStream
.groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
.aggregate(
ArrayList::new,
freeTextSignUtils::updateFreeTextSignUpdateList,
Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
.withKeySerde(Serdes.Long())
.withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.toStream()
.to(
storesService.getFreeTextSignUpdatesStoreTopicName(),
Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
);
FTS_PARTIAL_UPDATE_LIST
is a proper Serdes implementation defined as constant (similar to FTS_PARTIAL_UPDATE_MSG_SERDE
which is working without any issues).
The weird thing is not that I am getting ser/des errors, but the value defined in withValueSerde
is completely ignored and instead StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
is used (which is currently set to JsonSerdes
).
I am using similar pattern as above in other stream processing parts of the application without any issues. Also, when I replace the default JsonSerdes
with my FTS_PARTIAL_UPDATE_LIST
this works.
I have checked withValueSerde
documentation where it says that it will fallback to default Serdes when input value is null
, which is obviously not (I've checked that in debuger).
So actually, it was my mistake, since the deserializer in SerDes implementation was returning null.