My Kafka Streams application is consuming from a kafka topic that is using the following key-value layout:
String.class -> HistoryEvent.class
When printing my current topic this can be confirmed:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 -- SUCCESS #C:\Daten\file-service\in\crypto.p12
"flow1" is the String
key and the part after --
is the serialized value.
My flow is set up like this:
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey()
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
So as far as I know I am telling it to consume the topic using String
and HistoryEvent
serde as this is what is in the topic. I then 'rekey' it to use a combined key which should be stored locally using the provided serde for HistoryEventKey.class
. As far as I understand this will cause an additional topic to be created (can be seen with topic list in the kafka container) with the new key. This is fine.
Now the problem is the application is unable to start up even from a clean environment with just that one document in the topic:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
It is kinda hard to tell from the message where exactly the issue is. It says in my base topic but that is not possible as the key there is not of type HistoryEventKey
. Since I have provided a serde for HistoryEventKey
in the reduce
it also cannot be with the local store.
The only thing that makes sense to me is that it is related to the selectKey
operation that causes a rearranging and a new topic. However I am not able to figure out how I can provide the serde to that operation. I do not want to set it as a default, because it is not the default key serde.
After doing some more debugging of the execution I was able to figure out that the new topic is created in the groupByKey
step. You can provide a Grouped
instance that offers the possibility to specify the Serde
used for key and value:
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
.withKeySerde(new HistoryEventKeySerde())
.withValueSerde(new HistoryEventSerde())
)
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));