Having two input streams, both producing objects instances defined as
case class ReplayData(timestamp:Long, payload:Any)
Stream 1
1, payload1
1000, payload3
Stream 2
400, payload2
1500, payload4
I would like to implement replay mechanism which would push elements downstream ordered by timestamp I have on every element
It will simulate live scenarios from production.
This mechanism would need to obey delays between messages e.g. first message send is payload1 (its starting point), payload2 from Stream2 should be send after 400 ms (difference between next message timestamp and initial message timestamp) and so on.
I may do that quite easily using DelayedQueue which usage is explained in this SO thread
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.
Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements. does not permit null elements.
I'm trying to figure it out how to do that in Akka streams, but have troubles to find something which would solve this issue for me.
I was looking at mergeSorted as a way to merge two streams and order them based on some function.
And it seems more or less it would fit this purpose of ordering based on some custom function.
I'm not sure how to handle delays between elements based on timestamp property.
Using plain old AKKA, I may use scheduler to read data, order them and schedule every element to be send when time passed.
i don't remember anything in akka-streams that could delay messages out of the box with custom delay for each message. after all the idea behind akka-streams is reactive programming. i know only 2 options how to generally overcome you problem (assuming that you already merged 2 sources)
Flow.mapAsync - in this case it's totally your business to return a Future
after some delay. eg:
import java.time.LocalDateTime
import java.util.concurrent.Executors
import akka.NotUsed
import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
object Application extends App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
case class SomeEntity(time: Int, value: Int)
val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))
val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val scheduler = sys.scheduler
val f = source
.mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
.runForeach(se => println(s"${LocalDateTime.now()} -> $se"))
f.onComplete(_ => sys.terminate())
}
It might happen that your use case (simulation after all) is not actually so strict, so you might use Flow.throttle. It's is not that simple and precise as the 1st solution, but it's much more performant, because it uses some lightweight bucket model to control item output rate.
import java.time.LocalDateTime
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Application extends App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
case class SomeEntity(time: Int, value: Int)
val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))
val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
println(s"${LocalDateTime.now()} -> $se")
})
future.onComplete(_ => sys.terminate())
}