Search code examples
apache-flinkcomplex-event-processingflink-cep

Flink CEP not detecting last Record


My code helps in determining whether a record's amount goes above 25 or not in Flink CEP. Hence when I use process time it matches all the patterns but when I use Event time it doesn't match the last record.

{"trasanction_id":196,"customer_id":27,"datetime":"1576499008876","amount":6094,"state":"SUCCESS"}
{"trasanction_id":197,"customer_id":27,"datetime":"1576499017565","amount":547,"state":"SUCCESS"}
{"trasanction_id":198,"customer_id":27,"datetime":"1576499029116","amount":6824,"state":"SUCCESS"}
{"trasanction_id":196,"customer_id":27,"datetime":"1576499053211","amount":6094,"state":"SUCCESS"}
{"trasanction_id":197,"customer_id":28,"datetime":"1576499063867","amount":547,"state":"FAILED"}
{"trasanction_id":198,"customer_id":28,"datetime":"1576499073566","amount":6824,"state":"FAILED"}

The above mentioned are my records. I am interested in Matching every event with amount greater than 25 in event time. Ideally it should detect all the records (which it does in Processing Time) as amount is greater than 25 for all records. As of now I am using boundedoutofordertime extraction technique with 3 secs for out of orderness.

Please help me understand this. Thanks in Advance! :)


Solution

  • Because CEP is matching temporal patterns, when working with event time timestamps the events are first sorted in order by timestamp. This sorting involves buffering each event until the watermark catches up to that event, to give time for any earlier events to arrive first.

    Because your watermarks are configured to trail behind the leading edge (i.e., largest timestamp so far) of your stream by 3 seconds, the watermark for your stream is never going to reach the timestamp of the last event. This is why the last event is not being processed. Flink is waiting to see if any earlier events are going to arrive, and won't give up until the watermark indicates that the stream is complete up thru the timestamp of that last event.