I have a Flink application and want to evaluate the performance of the app.
My plan is to use a bounded history dataset as the data source in the experiment, by measuring the total time cost, I can get the through-output value. And by querying the latency metric, I can get the latency condition.
However, when using Kafka as a data source, after consuming the specific amount of data, the app seems still waiting for other data to be consumed and don't turn to FINISHED state. I think during the waiting time, the through output value and the latency may get lower, which is not related to the application's performance.
In this condition, how can I get the precise time cost and the latency condition related to my bounded data set, and avoid the influence comes from the extra-waiting stage?
If you use KafkaSource
, rather than FlinkKafkaConsumer
, you can specify ending offsets by using setBounded
, as in
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
env.fromSource(source, watermarkStrategy, "name"));
That way a bounded job using Kafka as a source will cleanly come to an end.
If you need to accomplish the same thing with a FlinkKafkaConsumer
, then implement a DeserializationSchema
(or a KafkaDeserializationSchema
) that at some point returns TRUE from isEndOfStream
.