Search code examples
apache-flink

StreamTask.getCheckpointLock deprecation and custom Flink sources


When writing custom checkpointed sources for Flink, one must make sure that emitting elements downstream, checkpointing and watermarks are emitted in a synchronized fashion. This is done by acquiring StreamContext.getCheckpointLock

Flink 1.10 introduced a deprecation to StreamTask.getCheckpointLock and is now recommending the use of MailboxExecutor for operation which require such synchronization.

I have a custom source implementation which is split into multiple phases. A SourceFunction[T] for reading file locations and an OneInputStreamOperator for downloading and emitting these elements downstream. Up until now, I used StreamSourceContexts.getSourceContext to receive the SourceContext used to emit elements, which looked as follows:

ctx = StreamSourceContexts.getSourceContext(
      getOperatorConfig.getTimeCharacteristic,
      getProcessingTimeService,
      getContainingTask.getCheckpointLock,
      getContainingTask.getStreamStatusMaintainer,
      output,
      getRuntimeContext.getExecutionConfig.getAutoWatermarkInterval,
      -1
)

And this context is being used throughout the code to emit elements and watermarks:

ctx.getCheckpointLock.synchronized(ctx.collect(item))
ctx.getCheckpointLock.synchronized(ctx.emitWatermark(watermark))

Is using the checkpoint lock still the preferred way to emit elements downstream? Or is it now recommended that we use MailboxExecutor instead and make collection and watermark inside the mailbox execution thread?


Solution

  • Checkpoint lock in source context is not deprecated as there is currently no way around implementing a source without the lock. These sources are already dubbed legacy sources exactly for that reason: they spawn their own thread and need the lock to emit data (push-based).

    There is currently a larger rework for sources (FLIP-27), which will offer a pull-based interface. This interface is called from the main task thread, such that no synchronization is necessary anymore. If some async work needs to be done, then MailboxExecutor is the way to go.

    FYI, new operators should (rather must) only use MailboxExecutor instead of checkpoint lock.