Search code examples
apache-flink

Controlling order of processed elements within CoProcessFunction using custom sources


For testing purposes, I am using the following custom source:

class ThrottledSource[T](
  data: Array[T],
  throttling: Int,
  beginWaitingTime: Int = 0,
  endWaitingTime: Int = 0
) extends SourceFunction[T] {

  private var isRunning = true
  private var offset = 0

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    Thread.sleep(beginWaitingTime)

    val lock = ctx.getCheckpointLock

    while (isRunning && offset < data.length) {
      lock.synchronized {
        ctx.collect(data(offset))
        offset += 1
      }
      Thread.sleep(throttling)
    }

    Thread.sleep(endWaitingTime)
  }

  override def cancel(): Unit = isRunning = false

and using it like this within my test

val controlStream = new ThrottledSource[Control](
  data = Array(c1,c2), endWaitingTime = 10000, throttling = 0,
)

val dataStream = new ThrottledSource[Event](
  data = Array(e1,e2,e3,e4,e5),
  throttling = 1000,
  beginWaitingTime = 2000,
  endWaitingTime = 2000,
)

val dataStream = env.addSource(events)

env.addSource(controlStream)
  .connect(dataStream)
  .process(MyProcessFunction)

My intent is to get all the control elements first (that is why I don't specify any beginWaitingTime nor any throttling). In processElement1 and processElement2 within MyProcessFunction I print the elements when I receive them. Most of the times I get the two control elements first as expected, but quite surprisingly to me from time to time I am getting data elements first, despite the two-second delay used for the data source to start emitting its elements. Can anyone explain this to me?


Solution

  • The control and data stream source operators are running in different threads, and as you've seen, there's no guarantee that the source instance running the control stream will get a chance to run before the instance running the data stream.

    You could look at the answer here and its associated code on github for one way to accomplish this reliably.