Search code examples
apache-flinkflink-streaming

How to measure through-output and latency in Flink when using a bounded Kafka data source?


I have a Flink application and want to evaluate the performance of the app.

My plan is to use a bounded history dataset as the data source in the experiment, by measuring the total time cost, I can get the through-output value. And by querying the latency metric, I can get the latency condition.

However, when using Kafka as a data source, after consuming the specific amount of data, the app seems still waiting for other data to be consumed and don't turn to FINISHED state. I think during the waiting time, the through output value and the latency may get lower, which is not related to the application's performance.

In this condition, how can I get the precise time cost and the latency condition related to my bounded data set, and avoid the influence comes from the extra-waiting stage?


Solution

  • If you use KafkaSource, rather than FlinkKafkaConsumer, you can specify ending offsets by using setBounded, as in

    KafkaSource<String> source = KafkaSource
            .<String>builder()
            .setBootstrapServers(...)
            .setGroupId(...)
            .setTopics(...)
            .setDeserializer(...)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setBounded(OffsetsInitializer.latest())
            .build();
    
    env.fromSource(source, watermarkStrategy, "name"));
    

    That way a bounded job using Kafka as a source will cleanly come to an end.

    If you need to accomplish the same thing with a FlinkKafkaConsumer, then implement a DeserializationSchema (or a KafkaDeserializationSchema) that at some point returns TRUE from isEndOfStream.