Search code examples
spring-bootapache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Confusion on implementing KStream / Table in Spring Boot


I'm trying to get a sample of some Spring-Boot kafka stream "action" working and I've seemed to end up completely confused :)

I'm receiving JSON data over the wire. I've built a schema in avro which I use to serialize the data:


{
  "UID": "XJ3_112",
  "type": "11X",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 0,
  "latitude": 50.1232,
  "longitude": -119.257,
  "altitude": 0,
  "time": "2018-07-18T00:00:13.9966Z"
}

{
  "platformUID": "BSG_SS_1_4",
  "type": "OB_334_11",
  "state": "ON_STATION",
  "fuelremaining": -1,
  "latitude": 56.1623,
  "longitude": -44.5614,
  "altitude": 519174,
  "time": "2018-07-18T00:01:43.0871Z"
}

This is as far as I've got:

@Component
class KStreamTransformer {

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @StreamListener(MyKafkaStreams.INPUT)
    @SendTo(MyKafkaStreams.OUTPUT)
    fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {

        return input.flatMapValues{
            value ->
            val out = Arrays.asList(value)

            out
        }.groupBy() ???
    }
}

I'm hoping to create a KTable that looks like this:

|platformUID|state|Lat|Lon|Alt| |-----------|-----|---|---|---|

And this is where I've got myself confused.

I'm assuming i want to do a GroupBy on the PlatformUID field but I'm unclear how to actually proceed forward.

Can somebody point me in the right direction?

I think what I'm looking for is to take the input stream and turn it into a KTable with the key being value.getUID() and the value being the value it was before


Solution

  • If platformUID is already the key that the data producer uses, you can use

    KTable ktable = kstream
                     .groupByKey()
                     .reduce((oldValue, newValue) -> newValue)
    

    If not, a KeyValueMapper should be put in .groupBy(), and it looks like

    KTable ktable = kstream
                     .groupBy((k, v) -> v.getPlatformUID())
                     .reduce((oldValue, newValue) -> newValue)
    

    It will create an internal topic which repartition the source topic with the new key.

    For java 7, the syntax of KeyValueMapper is as the following:

    KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
            public K apply(K key, V1 value) {
                return key;
            }
        };