please help me, i've two questions:
I read from Apache Kafka json-messages,(then I have steps: deserialization to POJO, filter, keyBy ....)
I can check input sequence in KeyedProcessFunction (check state, if-else blocks, out.collect(...), state.clear()...you will understand me),as well as I can use Flink CEP library with conditions and quantificators.
For Example:
I have input sequence: A1, (no events 1min) A2, (no events 5 min) А3, (no events 1 min) А4, (no events more 5 minutes) A5. (between A1 and A5 maybe a lot of events)
I want to send in output:A1, A3, A5.
First event, then if the next event came in less than 5 minutes after previous event it will not send to output, if the next event came in more than 5 minutes after previous it will send to output.
What should I add to my pattern???
Pattern<Event, ?> pattern = Pattern.
<Event>begin("start")
.where(new SimpleCondition<Event>(){
public boolean filter(Event event){
return event.getName().contains("A");
}
}).within(Time.minutes(5));
While at first glance this particular example seems rather trivial to implement as a KeyedProcessFunction
, there is definitely some complexity that arises if the messages can arrive out of order. Then you might be fooled into thinking there can been a substantial gap, when in fact there was not.
However, this particular example is a good match for session windows, if you want an easy, out-of-the-box, ready-made solution.
With CEP, I think a working solution would have this flavor: you are looking for a sequence of an A (call it A1) followed immediately by another A (call it A2), where (A2.timestamp - A1.timestamp) >= 5 minutes. When a match is found, emit A1 and advance the matching engine so that A2 becomes the new A1. (Conveniently, CEP pre-sorts the input stream(s), so you don't have to worry about things being out-of-order.)