Search code examples
apache-flinkflink-streamingflink-cep

Order of events processing in Flink and recovery


I am researching Flink for more than a week now. We are consuming events from Kafka and we want events belongs to a specific object id needs to process in the order of event time. So far my research tells me, I should be using keyby and timeWindows, is my understanding correct?

Another question, When one taskmanager goes down, only those events will belong to that task manager will be stopped processed until the task manager comes up? Does checkpoint mechanism aware the events that are not being processed, how it will request Kafka about those events?

Question with use case below

In a CallCenter, the agent will be receiving the calls and will go into different states. For every action by the agent say login, idle, busy,etc, we get agent event of that action as state through Kafka. The requirement is that we have to process the events in order by agent, we cannot process agent idle event before login event. We need to process these in order at the same time we need to scale out.

In Flink cluster with the parallel process, we should not end up processing the agent information in different partitions/TaskSlots with the bad state of an agent. My question is keyBy agentId would divide the stream into substreams and process them in a designated partition all the time, that way order of event processing is maintained.

Also, another question is if there is an exception/task manager going down for the partition which processing a particular agent data, how Flink knows to request only those agent events after recovery.


Solution

  • You will want to use keyBy(objectId) to partition the stream by object id.

    If you must sort the stream by event time, you have a couple of options. You can use windows to create batches of events that you sort (batch by batch) in a ProcessWindowFunction, or you can create a continuous ordered stream with a KeyedProcessFunction. Here's an example.

    Checkpoints in Flink are global. They include offsets in Kafka as well as all of the state across the distributed cluster that resulted from having ingested the inputs up to those offsets. Recovery involves restarting the cluster, restoring the cluster's state, rewinding the Kafka consumers to the offsets recorded in the checkpoint, and replaying the events from that point. Note that if your sink is not transactional, this can lead to duplicate results being written.

    Update:

    If if all of the data for each key is in only one Kafka partition, and if your data is already in sorted in Kafka (not globally sorted, but within each key), then Flink will preserve that ordering, even if you do a keyBy. This works because any given Kafka partition is only consumed by one instance of the Flink Kafka source.

    As for the second question, it doesn't matter if only one task manager goes down. All of the task managers will be restarted, and they will all rewind and resume processing from the offsets stored in the most recent checkpoint. The checkpoints are global, spanning the entire cluster -- there's no support for partial recovery.