How to continually call a REST service using non blocking code with Akka

I'm accessing data from a REST endpoint :


To access the data once per second I use an infinite loop while(true) { to invoke a message send to the actor once per second which begins the process of invoking the REST request:

The actor to access the data is:

object ProductTickerRestActor {

  case class StringData(data: String)


class ProductTickerRestActor extends Actor {
  override def receive: PartialFunction[Any, Unit] = {

    case ProductTickerRestActor.StringData(data) =>
      try {
        println("in ProductTickerRestActor")
        val rData ="").mkString
        println("rData : "+rData)

      catch {
        case e: Exception =>
          println("Exception thrown in ProductTickerRestActor: " + e.getMessage)

    case msg => println(s"I cannot understand ${msg.toString}")

I start the application using:

object ExchangeModelDataApplication {

  def main(args: Array[String]): Unit = {

    val actorSystem = ActorSystemConfig.getActorSystem

    val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
    val throttler = Throttlers.getThrottler(priceDataActor)
    while(true) {
      throttler ! ProductTickerRestActor.StringData("test")



object Throttlers {

  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 1.second)
    .to(Sink.actorRef(priceDataActor, NotUsed))

How to run the following code asynchronously instead of blocking using an infinite loop? :

throttler ! ProductTickerRestActor.StringData("test")

Also, the throttler, in this case, maybe redundant as I'm throttling requests within the loop regardless.


  • I would just use Akka Streams for this along with Akka HTTP. Using Akka 2.6.x, something along these lines would be sufficient for 1 request/second

    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import scala.concurrent.duration._
    object HTTPRepeatedly {
      implicit val system = ActorSystem()
      import system.dispatcher
      val sourceFromHttp: Source[String, NotUsed] =
        Source.repeated("test") // Not sure what "test" is actually used for here...
          .throttle(1, 1.second)
          .map { str =>
            HttpRequest(uri = "")
          }.mapAsync(1) { req =>

    Then you could, for instance (for simplicity, put this in a main within HTTPRepeatedly so the implicits are in scope etc.)

    val done: Future[Done] =
        .take(10) // stop after 10 requests
        .runWith(Sink.foreach { rData => println(s"rData: $rData") })
    scala.concurrent.Await.result(done, 11.minute)