Search code examples
apache-kafkaapache-kafka-streams

Why is this KStream/KTable topology propagating records that don't pass the filter?


I have the following topology that:

  1. Creates a state store
  2. Filters records based on SOME_CONDITION, maps its values to a new entity and finally publishes these records to another topic STATIONS_LOW_CAPACITY_TOPIC

However I am seeing this on the STATIONS_LOW_CAPACITY_TOPIC:

�   null
�   null
�   null
�   {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
�   {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
�   null

That is to say, it's as if it were also publishing to the STATIONS_LOW_CAPACITY_TOPIC topic those records that didn't pass the filter. How is this possible? How can I prevent them to be published?

This is the ksteams code:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    Stats(XXX)
                }
                .toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

UPDATE: I've simplyfied to topology and printed the resulting table. For some reason the final KTable also contains null valued records corresponding to upstream records that didn't pass the filter:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value ->
                    val conditionResult = (SOME_CONDITION)
                    println(conditionResult)
                    conditionResult
                }
                .print()

Logs:

false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)

Solution

  • The answer was in the javadoc of KTable.filter(...):

    Note that filter for a changelog stream works different to record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record is forwarded.

    That explains why I'm seeing null valued (tombstone) records sent downstream.

    To avoid it I converted the KTable to KStream and then applied the filter:

    kStream.groupByKey().reduce({ _, newValue -> newValue },
                    Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                            .withKeySerde(Serdes.Integer())
                            .withValueSerde(stationSerde))
                    .toStream()
                    .filter { _, value -> SOME_CONDITION }
                    .mapValues { station ->
                        StationStats(station.id, station.latitude, station.longitude, ...)
                    }
                    .to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
    

    Result:

    4   {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
    5   {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
    ...