Search code examples
apache-flinkflink-streaming

How to print the total number of lines in files using flink


I am reading lines from parquet for that I am using source functions similar to this one , however when I try counting number of lines being processed, nothing is printed although the job is completed :

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
lazy val stream: DataStream[Group] = env.addSource(new ParquetSourceFunction)
stream.map(_ => 1)                    
    .timeWindowAll(Time.seconds(180)) 
    .reduce( _ + _).print() 

Solution

  • The problem is the fact that You are using ProcessingTime, so basically whenever You are using the EventTime when the file is finished Flink is emitting a watemark with Long.Max value so that all windows are closed, but this does not happen when working with ProcessingTime, so simply speaking Flink doesn't wait for Your window to close and that's why You are not getting any valuable results.

    You may want to try to switch to DataSet API, which should be more appropriate for the task You want to achieve.

    Alternatively, You may try to play with EventTime and assign static Watermark, since Flink at the end will still emit watermark with Long.Max value.