Is it possible to emit data according to a defined clock with Akka Streams? Or do they just emit (ignoring backpressure) as fast as their data arrive? I'm particularly wondering if it's possible to playback historical data with a "mocked" clock, somehow using Source.tick
perhaps.
It depends on what you mean by "defined clock".
Actual Wall Time
As you mentioned Source.tick
is one possibility to get a clock of actual times coming from the system clock. The problem is that the Sink
may not signal demand at a rate that is greater than or equal to the interval that the Source generates ticks. For example, your Sink may only signal demand once every minute but your interval in Source.tick may be 10 seconds
. In this case the 5 intermediate ticks will be dropped, from the documentation:
If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later.
Simulated Time
It is always possible to simulate time using a Source
.
We can first create a function that will simulate a clock using a start time, end time, and interval:
type MillisFromEpoch = Long
type MillisInterval = Long
val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] =
(startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] {
var currentTime = startTime
override def hasNext : Boolean = currentTime < stopTime
override def next() : MillisFromEpoch = {
val returnMilis = currentTime
currentTime += interval
return returnMillis
}
}
This clock can now feed a Source. As an example we can create a clock that start at unix epoch and increments 1 second until the end of time:
val epoch : MillisFromEpoch = 0L
val second : MillisInterval = 1000L
val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] =
Source fromIterator clock(epoch, Long.MaxValue, 1*second)
Or we can create a clock that starts now, and ends in 60 seconds incrementing by 5 second intervals:
val now : MillisFromEpoch = System.currentTimeMillis()
val simulatedClockFromNowSource : Source[MillisFromEpoch,_] =
Source fromIterator clock(now, now + 60*second, 5*second)
Sampling Frequency
There is a way to use Source.tick even when the downstream consumer is slower than the tick interval specified at the Source. We can create a Flow.filter
that is constantly signaling demand to the Source but will only pass through times that are a defined increment apart.
We can start with a function that does the tracking of the time interval with an internal variable:
val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean =
(interval) => {
var lastValidTime : MillisFromEpoch = -1
(timeToCheck) => {
if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) {
lastValidTime = timeToCheck
true
}
else {
false
}
}
}
And now this function can be used to create the Flow:
val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] =
(frequency) => Flow[MillisFromEpoch] filter frequencySample(frequency)
Now we can create a Flow that has a slow frequency (e.g. 10 seconds) that is attached to a Source with a higher frequency (e.g. 1 second):
val slowFrequency : MillisInterval = 10 * second
//simulatedClockFromEpoch ticks every 1 second
//frequnencySampleFlow only passes every 10 second tick through
val slowSource =
simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)