Search code examples
apache-flinkflink-streaming

Flink event time processing in lost connection scenarios


Flink provides an example here : https://www.ververica.com/blog/stream-processing-introduction-event-time-apache-flink that describes the scenario that someone is playing a game, loses connection due to subway and then when he is back online all the data is back and can be sorted and processed.

My understanding with this is that if there's more players there are two options:

  1. All the other ones will be delayed waiting for this user to get back connection and send the data allowing the watermark to be pushed;

  2. This user is classified as idle allowing the watermark to move forward and when he gets connected all his data will go to late data stream;

I would like to have the following option: Each user is processed independently with its own watermark for his session window. Ideally I would even use ingestion time (so when he gets connection back I will put all the data into one unique session that would later order by the event timestamp once the session closes) and there would be a gap between the current time and the last timestamp (ingestion) of the window I'm processing (the session window guarantees this based on the time gap that terminates the session); I also don't want the watermark to be stuck once one user loses connection and I also don't want to manage idle states: just continue processing all the other events normally and once this user gets back do not classify any data as late data due to the watermark being advanced in time compared with the moment the user lost connection;

How could I implement the requirement above? I've been having a hard time working no scenarios like this due to watermark being global. Is there an easy explanation for not having watermarks for each key ?

Thank you in advance!


Solution

  • The closest Flink's watermarking comes to supporting this directly is probably the support for per-kafka-partition watermarking -- which isn't really a practical solution to the situation you describe (since having a kafka partition per user isn't realistic).

    What can be done is to simply ignore watermarking, and implement the logic yourself, using a KeyedProcessFunction.

    BTW, there was recently a thread about this on both the flink-user and flink-dev mailing lists under the subject Per Key Grained Watermark Support.