Search code examples
apache-flinkflink-streamingflink-cep

how to get the operation cost in Flink using cost estimator class provided in Flink


I want to do performance analysis of Flink CEP engine and I came across these classes

org.apache.flink.optimizer.costs.CostEstimator; 
org.apache.flink.optimizer.costs.Costs; 
org.apache.flink.optimizer.costs.DefaultCostEstimator;

But the issue is that I don't know how to use either of this class. Can someone provide me with a code or insinuation regarding, how can I find the costs estimation for operators { join for example} in Flink.

Below is the code for a join that I am performing in Flink

DataStream<JoinedEvent> joinedEventDataStream = stream1.join(stream2).where(new KeySelector<RRIntervalStreamEvent, Long>() {
        @Override
        public Long getKey(RRIntervalStreamEvent rrIntervalStreamEvent) throws Exception {
            return rrIntervalStreamEvent.getTime();
        }
    })
            .equalTo(new KeySelector<qrsIntervalStreamEvent, Long>() {
        @Override
        public Long getKey(qrsIntervalStreamEvent qrsIntervalStreamEvent) throws Exception {
            return qrsIntervalStreamEvent.getTime();
        }
    })
            .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
            .apply(new JoinFunction<RRIntervalStreamEvent, qrsIntervalStreamEvent, JoinedEvent>() {
                @Override
                public JoinedEvent join(RRIntervalStreamEvent rr, qrsIntervalStreamEvent qrs) throws Exception {

                    //getting the cost -- just checking

                   // costs.getCpuCost();

                    return new JoinedEvent(rr.getTime(),rr.getSensor_id(),qrs.getSensor_id(),rr.getRRInterval(),qrs.getQrsInterval());
                }
            });

how can I compute the cost for this join?


Solution

  • The cost classes belong to the optimizer of the DataSet API (Flink's batch processing API) while the CEP library is built on the DataStream API. The DataStream API does not leverage the DataSet API.

    The CEP library and the DataSet optimizer are completely unrelated. Hence, it is not possible to use this code to estimate the cost of a CEP pattern. I'm also not aware of another built-in method to estimate the cost of a CEP pattern (or any other DataStream program).