Search code examples
apache-flinkupgradedeprecatedwatermark

Upgrading Flink deprecated function calls


I am currently trying to upgrade a method call assignTimestampsAndWatermarks that is applied to a data stream. The data stream looks something like this:

DataStream<Auction> auctions = env.addSource(new AuctionSourceFunction(auctionSrcRates))
        .name("Custom Source")
        .setParallelism(params.getInt("p-auction-source", 1))
        .assignTimestampsAndWatermarks(new AuctionTimestampAssigner());

The AssignerWithPeriodicWatermark looks like this:

private static final class AuctionTimestampAssigner implements AssignerWithPeriodicWatermarks<Auction> {
        private long maxTimestamp = Long.MIN_VALUE;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(maxTimestamp);
        }

        @Override
        public long extractTimestamp(Auction element, long previousElementTimestamp) {
            maxTimestamp = Math.max(maxTimestamp, element.dateTime);
            return element.dateTime;
        }
    }

What are the steps I would need to take to upgrade from deprecated calls to the current best practices? Thanks.


Solution

  • Your watermark generator assumes that the events are in order, by timestamp, or at least accepts that any out-of-order events will be late. This is equivalent to

    assignTimestampsAndWatermarks(
        WatermarkStrategy
          .<Auction>forMonotonousTimestamps()
          .withTimestampAssigner((event, timestamp) -> event.dateTime))