Search code examples
scalaakkaakka-stream

Varying the time between each message using Scala throttler


I'm learning Akka and have written below code to throttle the number of messages sent to the TradeAction actor. A max of 3 messages is sent ever second. Can the throttler be amended such that the time between each message is set ? For example, between message1 and message2 delay by 2 seconds, between message2 and message3 delay by 1 second.

import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import systemconfig.ActorSystemConfig

import scala.concurrent.duration.DurationInt

case class Action(side: String)

object PlaceTrade {

  val actorSystem = ActorSystem("firstActorSystem")
  println(actorSystem.name)

  object TradeAction {
    def props(action : String) = Props(new TradeAction(action))
  }

  class TradeAction(actorName: String) extends Actor {
    override def receive: Receive = {
      case "Buy" => {
        val r = requests.get("http://www.google.com")
        println("r status code is "+r.statusCode)
        println("Buy")
        println("")
      }
      case "Sell" => {
        val r = requests.get("http://www.google.com")
        println("r status code is "+r.statusCode)
        println("Sell")
        println("")
      }
      case _ =>
    }
  }

  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(ac: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 3.second)
    .to(Sink.actorRef(ac, NotUsed))
    .run()

  def main(args: Array[String]): Unit = {
    val tradeAction = actorSystem.actorOf(TradeAction.props("TradeAction"))
    val throttler = getThrottler(tradeAction)

    val l = List(Action("Buy"),Action("Buy"),Action("Buy"),Action("Sell"))
    l.foreach(action => {
      throttler ! action.side
    })
  }


}

Solution

  • You can accomplish this with delayWith, which allows a (potentially stateful) method to define how long to delay this element (without re-ordering elements), for example:

    import akka.stream.scaladsl.{ DelayStrategy, DelayOverflowStrategy }
    import scala.concurrent.duration.FiniteDuration
    
    def decliningDelay(): DelayStrategy[Any] =
      new DelayStrategy {
        var nextDelaySeconds: Option[Int] = None
    
        def nextDelay(elem: Any): FiniteDuration =
          nextDelaySeconds match {
            case None =>
              nextDelaySeconds = 2
              0.seconds
            case Some(delay) if delay > 0 =>
              nextDelaySeconds = delay - 1
              delay.seconds
            case _ => 0.seconds
          }
      }
    
    def getThrottler(ac: ActorRef): ActorRef =
      Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
        .delayWith(() => decliningDelay(), DelayOverflowStrategy.backpressure)
        .to(Sink.actorRef(ac, NotUsed))
        .run()
    

    You can combine delayWith with throttle; I would probably put delayWith before throttle.