I'm trying to get a sample of some Spring-Boot kafka stream "action" working and I've seemed to end up completely confused :)
I'm receiving JSON data over the wire. I've built a schema in avro which I use to serialize the data:
{
"UID": "XJ3_112",
"type": "11X",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 0,
"latitude": 50.1232,
"longitude": -119.257,
"altitude": 0,
"time": "2018-07-18T00:00:13.9966Z"
}
{
"platformUID": "BSG_SS_1_4",
"type": "OB_334_11",
"state": "ON_STATION",
"fuelremaining": -1,
"latitude": 56.1623,
"longitude": -44.5614,
"altitude": 519174,
"time": "2018-07-18T00:01:43.0871Z"
}
This is as far as I've got:
@Component
class KStreamTransformer {
@Autowired
private lateinit var objectMapper: ObjectMapper
@StreamListener(MyKafkaStreams.INPUT)
@SendTo(MyKafkaStreams.OUTPUT)
fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {
return input.flatMapValues{
value ->
val out = Arrays.asList(value)
out
}.groupBy() ???
}
}
I'm hoping to create a KTable that looks like this:
|platformUID|state|Lat|Lon|Alt| |-----------|-----|---|---|---|
And this is where I've got myself confused.
I'm assuming i want to do a GroupBy
on the PlatformUID
field but I'm unclear how to actually proceed forward.
Can somebody point me in the right direction?
I think what I'm looking for is to take the input
stream and turn it into a KTable with the key being value.getUID()
and the value being the value it was before
If platformUID
is already the key that the data producer uses, you can use
KTable ktable = kstream
.groupByKey()
.reduce((oldValue, newValue) -> newValue)
If not, a KeyValueMapper should be put in .groupBy()
, and it looks like
KTable ktable = kstream
.groupBy((k, v) -> v.getPlatformUID())
.reduce((oldValue, newValue) -> newValue)
It will create an internal topic which repartition the source topic with the new key.
For java 7, the syntax of KeyValueMapper is as the following:
KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
public K apply(K key, V1 value) {
return key;
}
};