Search code examples
apache-flinkflink-streaming

Flink: Why losing data when union multi kafka source?


I am a newer of flink. I have five unlimited kafka source with different data schema. I want to reduce message and get the latest one then outer join all the kafka source with same key. So I use union to combine them together and then use ProcessWindowFunction to convert them to a big object. Then send to downstream. But there are always many lost data after union. I think the losing data because late.

        class CommonObj {

            var id: Long = 0
            var entityType: String? = null
            var timestamp: Long = System.currentTimeMillis()
            val eventMetas: MutableList<EventMeta> = mutableListOf()

            var kafkaStreamValue1: KafkaStreamObj1? = null
            var kafkaStreamValue2: KafkaStreamObj2? = null
            var kafkaStreamValue3: KafkaStreamObj3? = null
            var kafkaStreamValue4: KafkaStreamObj4? = null

            fun buildOutPutObjt(): OutPutObj = ....
        }

This is one of kafka source code. Other kafka source logic is very similar.

        val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness<OfferUpdated>(Duration.ofSeconds(5))
            .withIdleness(Duration.ofMinutes(1))

        val sourceStream1 = env.fromSource(
            getKafkaStream1(params),
            watermarkStrategy,
            "Kafka Source 1"
        )

        val kafkaSource1 = sourceStream1
            .filter { it != null }
            .map {
                EventObj<KafkaStreamObj1>(
                    it.id.toString() + it.entity, //this is key
                    it,                           //obj
                    it.sequence,                  //timestamp
                    mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
                )
            }
            .returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
            .keyBy {
                it.key }
            .window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
            .reduce { v1, v2 ->
                if (v1.obj.timestamp > v2.obj.timestamp) {
                    v1
                } else {
                    v2
                }
            }
            .map {
                val commonObj = CommonObj()
                commonObj.id = it.obj.id
                commonObj.entityType = it.obj.entityType
                commonObj.timestamp = System.currentTimeMillis()
                commonObj.kafkaStreamValue1 = it.obj.entity // For other kafka stream, it will use kafkaStreamValue2 or kafkaStreamValue3
                commonObj
            }
            .returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))

This union code

        kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
            .keyBy(keySelector)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
            .process(EventProcessFunction(params))
            .sinkTo(kafkaSink())

This EventProcessFunction

class EventProcessFunction(private val params: ParameterTool) : ProcessWindowFunction<CommonObj, OutPutObj, String, TimeWindow>() {
    override fun open(parameters: Configuration?) {
        super.open(parameters)
        //open data source
    }

    override fun close() {
        //close data source
    }

    override fun process(
        key: String,
        context: Context,
        elements: MutableIterable<CommonObj>,
        out: Collector<OutPutObj>
    ) {
        val commonObj = CommonObj()
        //LTS: latest time stamp
        var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
        val id = elements.first().id

        elements.forEach {
            commonObj.id = it.id
            commonObj.entityType = elements.first().entityType
            commonObj.timestamp = System.currentTimeMillis()
            if (it.id != id) {
                log.error { "id not equal ele id: ${it.id}, first id $id" }
            }

            if (it.kafkaStreamObj1 != null) {
                if (commonObj.kafkaStreamObj1 != null && kafkaStreamObj1LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj1LTS = it.timestamp
                commonObj.kafkaStreamObj1 = it.kafkaStreamObj1
            } else if (it.kafkaStreamObj2 != null) {
                if (commonObj.kafkaStreamObj2 != null && kafkaStreamObj2LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj2LTS = it.timestamp
                commonObj.kafkaStreamObj2 = it.kafkaStreamObj2
            } else if (it.kafkaStreamObj3 != null) {
                if (commonObj.kafkaStreamObj3 != null && kafkaStreamObj3LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj3LTS = it.timestamp
                commonObj.kafkaStreamObj3 = it.kafkaStreamObj3
            } else if (it.kafkaStreamObj4 != null) {
                if (commonObj.kafkaStreamObj4 != null && kafkaStreamObj4LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj4LTS = it.timestamp
                commonObj.kafkaStreamObj4 = it.kafkaStreamObj4
            }
        }

        if (commonObj.kafkaStreamObj1 == null && commonObj.entityType == EntityType.V.name) {
            val kafkaStreamObj1Db = kafkaStreamObj1Repository.findKafkaStreamObj1(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj1 = kafkaStreamObj1Db
        }
        if (commonObj.kafkaStreamObj2 == null) {
            val kafkaStreamObj2Db = kafkaStreamObj2Repository.findKafkaStreamObj2(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj2 = kafkaStreamObj2Db
        }
        if (commonObj.kafkaStreamObj3 == null) {
            val kafkaStreamObj3Db =
                kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj3 = kafkaStreamObj3Db
        }
        if (commonObj.kafkaStreamObj4 == null) {
            val kafkaStreamObj4Db =
                kafkaStreamObj4Repository.kafkaStreamObj4Repository(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj4 = kafkaStreamObj4Db
        }

        val outPutObj = commonObj.buildOutPutObjt()
        out.collect(outPutObj)
    }

}

I deleted some sensitive information. Why the message may lost after union? As I know, the union watermark will use the minimum value of all kafka source. It shouldn't lost any one and may have backpressure for some faster kafka source.

I also have tried TumblingProcessingTimeWindows and no watermark. But it will have big backpressure when kafka topic have lag. Then checkpoint will timeout. Even increase checkpoint timeout, the checkpoint will start deplay for a long time (10-30minutes). The root cause should be DB query need a long time and blocked processing after union. But it's normal when no lag in kafka topic.


Solution

  • It seems like you should make two changes...

    1. Switch to process time (versus event time), so use TumblingProcessingTimeWindows instead of TumblingEventTimeWindows. That should avoid any late events.
    2. Use an aggregation function in the second (post-union) windowing operation, so that Flink doesn't have to save every record in state for 30 seconds. That should reduce the size of your state.

    The AggregateFunction would use an accumulator (returned by the createAccumulator() method) that stores everything you initialize in your EventProcessFunction.process() method, namely:

            val commonObj = CommonObj()
            //LTS: latest time stamp
            var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
            var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
            var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
            var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
            val id = elements.first().id
    
    

    The function's add() method would do everything that you currently do inside of your forEach loop.

    Finally, the function's getResult() method would execute the code that's after your forEach loop, to return the outPutObj result.