Search code examples
apache-flinkflink-streaming

What TimestampsAndWatermarksTransformation class does in assignTimestampsAndWatermarks()


In the following code


public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
        WatermarkStrategy<T> watermarkStrategy) {
    final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
    // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
    // and chain
    final int inputParallelism = getTransformation().getParallelism();
    final TimestampsAndWatermarksTransformation<T> transformation =
            new TimestampsAndWatermarksTransformation<>(
                    "Timestamps/Watermarks",
                    inputParallelism,
                    getTransformation(),
                    cleanedStrategy);
    getExecutionEnvironment().addOperator(transformation);
    return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
}

The assignTimestampsAndWatermarks() receives the main stream and assigns timestamps and watermarks based on the strategy specified in params, at the end, it will return SingleOutputStreamOperator which is the updated stream with timestamps and watermarks generated.

My question is, what TimestampsAndWatermarksTransformation does here (internally) and what is the effect of this line getExecutionEnvironment().addOperator(transformation); as well.


Solution

  • When you call assignTimestampsAndWatermarks on a stream, this code adds an operator to the job graph to do the timestamp extraction and watermark generation. This is wiring things up so that the specified watermarking will actually get done.

    Internally there are two types of Transformation: (1) physical transformations, such as map or assignTimestampsAndWatermarks, which alter the stream records, and (2) logical transformations, such as union, that only affect the topology.