Search code examples
apache-kafkaapache-flinkflink-streamingflink-sqlflink-cep

Apache Flink delay processing of certain events


I have a requirement to delay processing of some of the events.

eg. I have three events (published on Kafka):

  • A (id: 1, retryAt: now)
  • B (id: 2, retryAt: 10 minutes later)
  • C (id: 3, retryAt: now)

I need to process record A and C immediately while record B needs to be processed Ten minutes later. Is this something feasible to achieve in Apache Flink?

So far whatever I have researched, it seems, "Triggers" is something which might help to achieve it in Flink but have not been able to implement it correctly yet.

I looked through Kafka documentation too, but it doesn't look feasible there.


Solution

  • Triggers are for windows, but windowing doesn't seem appropriate for your use case.

    A better solution would be to use Timers with a KeyedProcessFunction. Depending on whether you want to wait for 10 minutes of processing time, or 10 minutes of event time, you'll choose processing time timers or event time timers.

    You'll also need to use Flink state to store the events that need to be processed later.

    You'll find the documentation for process functions here. There are some additional examples in the Flink training, here and here.

    FWIW, Flink's Stateful Functions API might be a better fit for what you're doing, in which case you would use delayed messages.