Search code examples
javaapache-flinkflink-cep

FLINK CEP (Java 8) - persistent "identity" through matching pattern


I am trying to use FLINK-CEP for measuring the time a Bid in a market takes from having BidState.OPEN to BidState.Closed. I am recieving a DataStream of Bids with ID's and states, and as it stands I am matching all "OPENED" bids with all "CLOSED" bids.

I have a conditional in patternStream.process which only allows opening and closing bids with the same ID to be paired, as they should be. This feels wrong though, as the amount of matches grows incredibly fast this way, and I have a feeling that this could be done with patterns. So, is there a way to make sure that both "start" and "end" objects have the same ID?

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) {
                return value.getState() == BidState.OPENED;
            }
        }).followedByAny("end").where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) throws Exception {
                return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
            }
        }).within(timeout);

PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);

patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
    @Override
    public void processMatch(Map<String
            , List<BidEvent>> map
            , Context context
            , Collector<MatchingDuration> collector) {

        BidEvent start = map.get("start").get(0);
        BidEvent end = map.get("end").get(0);
        if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
            collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
        }
    }
}).addSink(matchingDurationSinkFunction);

Solution

  • I figured out how to get the behaviour I wanted: the BidEventDataStream must be keyed in order to pattern match on objects with the same key. No changes are necessary to the code in the question, however BidEventDataStream must be edited to capture BidEvent.getBidId():

    BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
                        @Override
                        public Long getKey(BidEventvalue) {
                            return value.getBidId();
                        }
                    })