I want to do performance analysis of Flink CEP engine and I came across these classes
org.apache.flink.optimizer.costs.CostEstimator;
org.apache.flink.optimizer.costs.Costs;
org.apache.flink.optimizer.costs.DefaultCostEstimator;
But the issue is that I don't know how to use either of this class. Can someone provide me with a code or insinuation regarding, how can I find the costs estimation for operators { join for example} in Flink.
Below is the code for a join that I am performing in Flink
DataStream<JoinedEvent> joinedEventDataStream = stream1.join(stream2).where(new KeySelector<RRIntervalStreamEvent, Long>() {
@Override
public Long getKey(RRIntervalStreamEvent rrIntervalStreamEvent) throws Exception {
return rrIntervalStreamEvent.getTime();
}
})
.equalTo(new KeySelector<qrsIntervalStreamEvent, Long>() {
@Override
public Long getKey(qrsIntervalStreamEvent qrsIntervalStreamEvent) throws Exception {
return qrsIntervalStreamEvent.getTime();
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply(new JoinFunction<RRIntervalStreamEvent, qrsIntervalStreamEvent, JoinedEvent>() {
@Override
public JoinedEvent join(RRIntervalStreamEvent rr, qrsIntervalStreamEvent qrs) throws Exception {
//getting the cost -- just checking
// costs.getCpuCost();
return new JoinedEvent(rr.getTime(),rr.getSensor_id(),qrs.getSensor_id(),rr.getRRInterval(),qrs.getQrsInterval());
}
});
how can I compute the cost for this join?
The cost classes belong to the optimizer of the DataSet API (Flink's batch processing API) while the CEP library is built on the DataStream API. The DataStream API does not leverage the DataSet API.
The CEP library and the DataSet optimizer are completely unrelated. Hence, it is not possible to use this code to estimate the cost of a CEP pattern. I'm also not aware of another built-in method to estimate the cost of a CEP pattern (or any other DataStream program).