Search code examples
javascalaakkaakka-stream

Replay data collected live to simulate real traffic delays and ordering of messages


Having two input streams, both producing objects instances defined as

case class ReplayData(timestamp:Long, payload:Any)

Stream 1

1, payload1

1000, payload3

Stream 2

400, payload2

1500, payload4

I would like to implement replay mechanism which would push elements downstream ordered by timestamp I have on every element

It will simulate live scenarios from production.

This mechanism would need to obey delays between messages e.g. first message send is payload1 (its starting point), payload2 from Stream2 should be send after 400 ms (difference between next message timestamp and initial message timestamp) and so on.

I may do that quite easily using DelayedQueue which usage is explained in this SO thread

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.

The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.

Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.

For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements. does not permit null elements.

I'm trying to figure it out how to do that in Akka streams, but have troubles to find something which would solve this issue for me.

I was looking at mergeSorted as a way to merge two streams and order them based on some function.

And it seems more or less it would fit this purpose of ordering based on some custom function.

I'm not sure how to handle delays between elements based on timestamp property.

Using plain old AKKA, I may use scheduler to read data, order them and schedule every element to be send when time passed.


Solution

  • i don't remember anything in akka-streams that could delay messages out of the box with custom delay for each message. after all the idea behind akka-streams is reactive programming. i know only 2 options how to generally overcome you problem (assuming that you already merged 2 sources)

    1. Flow.mapAsync - in this case it's totally your business to return a Future after some delay. eg:

      import java.time.LocalDateTime
      import java.util.concurrent.Executors
      
      import akka.NotUsed
      import akka.actor.ActorSystem
      import akka.pattern.after
      import akka.stream.ActorMaterializer
      import akka.stream.scaladsl.Source
      
      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.duration._
      import scala.concurrent.{ExecutionContext, Future}
      
      object Application extends App {
      
        implicit val sys: ActorSystem = ActorSystem()
        implicit val mat: ActorMaterializer = ActorMaterializer()
      
        case class SomeEntity(time: Int, value: Int)
        val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))
      
        val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
        val scheduler = sys.scheduler
      
        val f = source
          .mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
          .runForeach(se => println(s"${LocalDateTime.now()} -> $se"))
      
        f.onComplete(_ => sys.terminate())
      }
      
    2. It might happen that your use case (simulation after all) is not actually so strict, so you might use Flow.throttle. It's is not that simple and precise as the 1st solution, but it's much more performant, because it uses some lightweight bucket model to control item output rate.

      import java.time.LocalDateTime
      
      import akka.NotUsed
      import akka.actor.ActorSystem
      import akka.stream.ActorMaterializer
      import akka.stream.scaladsl.Source
      
      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.duration._
      
      object Application extends App {
      
        implicit val sys: ActorSystem = ActorSystem()
        implicit val mat: ActorMaterializer = ActorMaterializer()
      
        case class SomeEntity(time: Int, value: Int)
      
        val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))
      
      
        val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
          println(s"${LocalDateTime.now()} -> $se")
        })
      
        future.onComplete(_ => sys.terminate())
      }