I am new to Kafka Streams, I am using version 1.0.0. I would like to set a new key for a KTable from one of the values.
When using KStream, it cane be done by using method selectKey() like this.
kstream.selectKey ((k,v) -> v.newKey)
However such method is missing in KTable. Only way is to convert given KTable to KStream. Any thoughts on this issue? Its changing a key against design of KTable?
If you want to set a new key, you need to re-group the KTable:
KTable newTable = table.groupBy(/*put select key function here*/)
.aggregate(...);
Because a key must be unique for a KTable (in contrast to a KStream) it's required to specify an aggregation function that aggregates all records with same (new) key into a single value.
Since Kafka 2.5, Kafka Streams also support KStream#toTable()
operator. Thus, it is also possible to do table.toStream().selectKey(...).toTable()
. There are advantages and disadvantages for both approaches.
The main disadvantage of using toTable()
is that it will repartition the input data based on the new key, which leads to interleaves writes into the repartition topic and thus to out-of-order data. While the first approach via groupBy()
uses the same implementation, using the aggregation function helps you to resolve "conflicts" expliclity. If you use the toTable()
operator, an "blind" upsert based on offset order of the repartition topic is done (this is actually similar to the code example in the other answers).
Example:
Key | Value
A | (a,1)
B | (a,2)
If you re-key on a
your output table would be either once of both (but it's not defined with one):
Key | Value Key | Value
a | 1 a | 2
The operation to "rekey" a table is semantically always ill-defined.