For testing purposes, I am using the following custom source:
class ThrottledSource[T](
data: Array[T],
throttling: Int,
beginWaitingTime: Int = 0,
endWaitingTime: Int = 0
) extends SourceFunction[T] {
private var isRunning = true
private var offset = 0
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
Thread.sleep(beginWaitingTime)
val lock = ctx.getCheckpointLock
while (isRunning && offset < data.length) {
lock.synchronized {
ctx.collect(data(offset))
offset += 1
}
Thread.sleep(throttling)
}
Thread.sleep(endWaitingTime)
}
override def cancel(): Unit = isRunning = false
and using it like this within my test
val controlStream = new ThrottledSource[Control](
data = Array(c1,c2), endWaitingTime = 10000, throttling = 0,
)
val dataStream = new ThrottledSource[Event](
data = Array(e1,e2,e3,e4,e5),
throttling = 1000,
beginWaitingTime = 2000,
endWaitingTime = 2000,
)
val dataStream = env.addSource(events)
env.addSource(controlStream)
.connect(dataStream)
.process(MyProcessFunction)
My intent is to get all the control elements first (that is why I don't specify any beginWaitingTime
nor any throttling
). In processElement1
and processElement2
within MyProcessFunction I print the elements when I receive them. Most of the times I get the two control elements first as expected, but quite surprisingly to me from time to time I am getting data elements first, despite the two-second delay used for the data source to start emitting its elements. Can anyone explain this to me?
The control and data stream source operators are running in different threads, and as you've seen, there's no guarantee that the source instance running the control stream will get a chance to run before the instance running the data stream.
You could look at the answer here and its associated code on github for one way to accomplish this reliably.