I am trying to use FLINK-CEP for measuring the time a Bid in a market takes from having BidState.OPEN
to BidState.Closed
. I am recieving a DataStream of Bids with ID's and states, and as it stands I am matching all "OPENED" bids with all "CLOSED" bids.
I have a conditional in patternStream.process
which only allows opening and closing bids with the same ID to be paired, as they should be. This feels wrong though, as the amount of matches grows incredibly fast this way, and I have a feeling that this could be done with patterns. So, is there a way to make sure that both "start" and "end" objects have the same ID?
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) {
return value.getState() == BidState.OPENED;
}
}).followedByAny("end").where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) throws Exception {
return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
}
}).within(timeout);
PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);
patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
@Override
public void processMatch(Map<String
, List<BidEvent>> map
, Context context
, Collector<MatchingDuration> collector) {
BidEvent start = map.get("start").get(0);
BidEvent end = map.get("end").get(0);
if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
}
}
}).addSink(matchingDurationSinkFunction);
I figured out how to get the behaviour I wanted: the BidEventDataStream
must be keyed in order to pattern match on objects with the same key. No changes are necessary to the code in the question, however BidEventDataStream
must be edited to capture BidEvent.getBidId()
:
BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
@Override
public Long getKey(BidEventvalue) {
return value.getBidId();
}
})