I am a newer of flink. I have five unlimited kafka source with different data schema. I want to reduce message and get the latest one then outer join all the kafka source with same key. So I use union to combine them together and then use ProcessWindowFunction to convert them to a big object. Then send to downstream. But there are always many lost data after union. I think the losing data because late.
class CommonObj {
var id: Long = 0
var entityType: String? = null
var timestamp: Long = System.currentTimeMillis()
val eventMetas: MutableList<EventMeta> = mutableListOf()
var kafkaStreamValue1: KafkaStreamObj1? = null
var kafkaStreamValue2: KafkaStreamObj2? = null
var kafkaStreamValue3: KafkaStreamObj3? = null
var kafkaStreamValue4: KafkaStreamObj4? = null
fun buildOutPutObjt(): OutPutObj = ....
}
This is one of kafka source code. Other kafka source logic is very similar.
val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness<OfferUpdated>(Duration.ofSeconds(5))
.withIdleness(Duration.ofMinutes(1))
val sourceStream1 = env.fromSource(
getKafkaStream1(params),
watermarkStrategy,
"Kafka Source 1"
)
val kafkaSource1 = sourceStream1
.filter { it != null }
.map {
EventObj<KafkaStreamObj1>(
it.id.toString() + it.entity, //this is key
it, //obj
it.sequence, //timestamp
mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
)
}
.returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
.keyBy {
it.key }
.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
.reduce { v1, v2 ->
if (v1.obj.timestamp > v2.obj.timestamp) {
v1
} else {
v2
}
}
.map {
val commonObj = CommonObj()
commonObj.id = it.obj.id
commonObj.entityType = it.obj.entityType
commonObj.timestamp = System.currentTimeMillis()
commonObj.kafkaStreamValue1 = it.obj.entity // For other kafka stream, it will use kafkaStreamValue2 or kafkaStreamValue3
commonObj
}
.returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
This union code
kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
.keyBy(keySelector)
.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
.process(EventProcessFunction(params))
.sinkTo(kafkaSink())
This EventProcessFunction
class EventProcessFunction(private val params: ParameterTool) : ProcessWindowFunction<CommonObj, OutPutObj, String, TimeWindow>() {
override fun open(parameters: Configuration?) {
super.open(parameters)
//open data source
}
override fun close() {
//close data source
}
override fun process(
key: String,
context: Context,
elements: MutableIterable<CommonObj>,
out: Collector<OutPutObj>
) {
val commonObj = CommonObj()
//LTS: latest time stamp
var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
val id = elements.first().id
elements.forEach {
commonObj.id = it.id
commonObj.entityType = elements.first().entityType
commonObj.timestamp = System.currentTimeMillis()
if (it.id != id) {
log.error { "id not equal ele id: ${it.id}, first id $id" }
}
if (it.kafkaStreamObj1 != null) {
if (commonObj.kafkaStreamObj1 != null && kafkaStreamObj1LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj1LTS = it.timestamp
commonObj.kafkaStreamObj1 = it.kafkaStreamObj1
} else if (it.kafkaStreamObj2 != null) {
if (commonObj.kafkaStreamObj2 != null && kafkaStreamObj2LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj2LTS = it.timestamp
commonObj.kafkaStreamObj2 = it.kafkaStreamObj2
} else if (it.kafkaStreamObj3 != null) {
if (commonObj.kafkaStreamObj3 != null && kafkaStreamObj3LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj3LTS = it.timestamp
commonObj.kafkaStreamObj3 = it.kafkaStreamObj3
} else if (it.kafkaStreamObj4 != null) {
if (commonObj.kafkaStreamObj4 != null && kafkaStreamObj4LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj4LTS = it.timestamp
commonObj.kafkaStreamObj4 = it.kafkaStreamObj4
}
}
if (commonObj.kafkaStreamObj1 == null && commonObj.entityType == EntityType.V.name) {
val kafkaStreamObj1Db = kafkaStreamObj1Repository.findKafkaStreamObj1(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj1 = kafkaStreamObj1Db
}
if (commonObj.kafkaStreamObj2 == null) {
val kafkaStreamObj2Db = kafkaStreamObj2Repository.findKafkaStreamObj2(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj2 = kafkaStreamObj2Db
}
if (commonObj.kafkaStreamObj3 == null) {
val kafkaStreamObj3Db =
kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj3 = kafkaStreamObj3Db
}
if (commonObj.kafkaStreamObj4 == null) {
val kafkaStreamObj4Db =
kafkaStreamObj4Repository.kafkaStreamObj4Repository(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj4 = kafkaStreamObj4Db
}
val outPutObj = commonObj.buildOutPutObjt()
out.collect(outPutObj)
}
}
I deleted some sensitive information. Why the message may lost after union? As I know, the union watermark will use the minimum value of all kafka source. It shouldn't lost any one and may have backpressure for some faster kafka source.
I also have tried TumblingProcessingTimeWindows and no watermark. But it will have big backpressure when kafka topic have lag. Then checkpoint will timeout. Even increase checkpoint timeout, the checkpoint will start deplay for a long time (10-30minutes). The root cause should be DB query need a long time and blocked processing after union. But it's normal when no lag in kafka topic.
It seems like you should make two changes...
TumblingProcessingTimeWindows
instead of TumblingEventTimeWindows
. That should avoid any late events.The AggregateFunction would use an accumulator (returned by the createAccumulator()
method) that stores everything you initialize in your EventProcessFunction.process() method, namely:
val commonObj = CommonObj()
//LTS: latest time stamp
var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
val id = elements.first().id
The function's add()
method would do everything that you currently do inside of your forEach
loop.
Finally, the function's getResult()
method would execute the code that's after your forEach
loop, to return the outPutObj
result.