In the following code
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
// match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
// and chain
final int inputParallelism = getTransformation().getParallelism();
final TimestampsAndWatermarksTransformation<T> transformation =
new TimestampsAndWatermarksTransformation<>(
"Timestamps/Watermarks",
inputParallelism,
getTransformation(),
cleanedStrategy);
getExecutionEnvironment().addOperator(transformation);
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
}
The assignTimestampsAndWatermarks()
receives the main stream and assigns timestamps and watermarks based on the strategy specified in params, at the end, it will return SingleOutputStreamOperator
which is the updated stream with timestamps and watermarks generated.
My question is, what TimestampsAndWatermarksTransformation
does here (internally) and what is the effect of this line getExecutionEnvironment().addOperator(transformation);
as well.
When you call assignTimestampsAndWatermarks
on a stream, this code adds an operator to the job graph to do the timestamp extraction and watermark generation. This is wiring things up so that the specified watermarking will actually get done.
Internally there are two types of Transformation: (1) physical transformations, such as map
or assignTimestampsAndWatermarks
, which alter the stream records, and (2) logical transformations, such as union
, that only affect the topology.