Search code examples
scalaakka-stream

Playback historical data Akka Stream


Is it possible to emit data according to a defined clock with Akka Streams? Or do they just emit (ignoring backpressure) as fast as their data arrive? I'm particularly wondering if it's possible to playback historical data with a "mocked" clock, somehow using Source.tick perhaps.


Solution

  • It depends on what you mean by "defined clock".

    Actual Wall Time

    As you mentioned Source.tick is one possibility to get a clock of actual times coming from the system clock. The problem is that the Sink may not signal demand at a rate that is greater than or equal to the interval that the Source generates ticks. For example, your Sink may only signal demand once every minute but your interval in Source.tick may be 10 seconds. In this case the 5 intermediate ticks will be dropped, from the documentation:

    If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later.

    Simulated Time

    It is always possible to simulate time using a Source.

    We can first create a function that will simulate a clock using a start time, end time, and interval:

    type MillisFromEpoch = Long
    
    type MillisInterval = Long
    
    
    val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] = 
      (startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] {
        var currentTime = startTime
    
        override def hasNext : Boolean = currentTime < stopTime
    
        override def next() : MillisFromEpoch = {
          val returnMilis = currentTime
          currentTime += interval
    
          return returnMillis
        }
      }
    

    This clock can now feed a Source. As an example we can create a clock that start at unix epoch and increments 1 second until the end of time:

    val epoch : MillisFromEpoch = 0L
    
    val second : MillisInterval = 1000L
    
    val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] = 
      Source fromIterator clock(epoch, Long.MaxValue, 1*second)
    

    Or we can create a clock that starts now, and ends in 60 seconds incrementing by 5 second intervals:

    val now : MillisFromEpoch = System.currentTimeMillis()
    
    val simulatedClockFromNowSource : Source[MillisFromEpoch,_] = 
      Source fromIterator clock(now, now + 60*second, 5*second)
    

    Sampling Frequency

    There is a way to use Source.tick even when the downstream consumer is slower than the tick interval specified at the Source. We can create a Flow.filter that is constantly signaling demand to the Source but will only pass through times that are a defined increment apart.

    We can start with a function that does the tracking of the time interval with an internal variable:

    val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean = 
      (interval) => {
    
        var lastValidTime : MillisFromEpoch = -1
    
        (timeToCheck) => {
          if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) {
            lastValidTime = timeToCheck
    
            true
          }
          else {
            false
          }
        }
      }
    

    And now this function can be used to create the Flow:

    val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] =
      (frequency) => Flow[MillisFromEpoch]  filter frequencySample(frequency)
    

    Now we can create a Flow that has a slow frequency (e.g. 10 seconds) that is attached to a Source with a higher frequency (e.g. 1 second):

    val slowFrequency : MillisInterval = 10 * second
    
    //simulatedClockFromEpoch ticks every 1 second
    //frequnencySampleFlow only passes every 10 second tick through
    val slowSource = 
      simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)