Search code examples
apache-flinkflink-streaming

Proper way to assign watermark with DateStreamSource<List<T>> using Flink


I have a continuing JSONArray data produced to Kafka topic,and I wanna process records with EventTime characteristic.In order to reach this goal,I have to assign watermark to each record which contained in the JSONArray.

I didn't find a convenience way to achieve this goal.My solution is consuming data from DataStreamSource> ,then iterate List and collect Object to downstream with an anonymous ProcessFunction,finally assign watermark to the this downstream.

The major code shows below:

DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env); SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource .process(new ProcessFunction<List<MockData>, MockData>() { @Override public void processElement(List<MockData> value, Context ctx, Collector<MockData> out) throws Exception { value.forEach(mockData -> out.collect(mockData)); } }); convertToPojo.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) { @Override public long extractTimestamp(MockData element) { return element.getTimestamp(); } }); SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo .keyBy("country").window( SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10))) .process( new FlinkEventTimeCountFunction()).name("count elements");

The code seems all right without doubt,running without error as well.But ProcessWindowFunction never triggered.I tracked the Flink source code,find EventTimeTrigger never returns TriggerResult.FIRE,causing by TriggerContext.getCurrentWatermark returns Long.MIN_VALUE all the time.

What's the proper way to process List in eventtime?Any suggestion will be appreciated.


Solution

  • The problem is that you are applying the keyBy and window operations to the convertToPojo stream, rather than the stream with timestamps and watermarks (which you didn't assign to a variable).

    If you write the code more or less like this, it should work:

    listDataStreamSource = KafkaSource ...
    convertToPojo = listDataStreamSource.process ...
    pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
    countStream = pojoPlusWatermarks.keyBy ...
    

    Calling assignTimestampsAndWatermarks on the convertToPojo stream does not modify that stream, but rather creates a new datastream object that includes timestamps and watermarks. You need to apply your windowing to that new datastream.