I am currently trying to upgrade a method call assignTimestampsAndWatermarks
that is applied to a data stream. The data stream looks something like this:
DataStream<Auction> auctions = env.addSource(new AuctionSourceFunction(auctionSrcRates))
.name("Custom Source")
.setParallelism(params.getInt("p-auction-source", 1))
.assignTimestampsAndWatermarks(new AuctionTimestampAssigner());
The AssignerWithPeriodicWatermark looks like this:
private static final class AuctionTimestampAssigner implements AssignerWithPeriodicWatermarks<Auction> {
private long maxTimestamp = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestamp);
}
@Override
public long extractTimestamp(Auction element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element.dateTime);
return element.dateTime;
}
}
What are the steps I would need to take to upgrade from deprecated calls to the current best practices? Thanks.
Your watermark generator assumes that the events are in order, by timestamp, or at least accepts that any out-of-order events will be late. This is equivalent to
assignTimestampsAndWatermarks(
WatermarkStrategy
.<Auction>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.dateTime))