Search code examples
scalaakkathrottling

How to continually call a REST service using non blocking code with Akka


I'm accessing data from a REST endpoint :

"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"

To access the data once per second I use an infinite loop while(true) { to invoke a message send to the actor once per second which begins the process of invoking the REST request:

The actor to access the data is:

object ProductTickerRestActor {

  case class StringData(data: String)

}

class ProductTickerRestActor extends Actor {
  
  override def receive: PartialFunction[Any, Unit] = {

    case ProductTickerRestActor.StringData(data) =>
      try {
        println("in ProductTickerRestActor")
        val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
        println("rData : "+rData)

      }
      catch {
        case e: Exception =>
          println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
      }

    case msg => println(s"I cannot understand ${msg.toString}")
  }
}

I start the application using:

object ExchangeModelDataApplication {

  def main(args: Array[String]): Unit = {

    val actorSystem = ActorSystemConfig.getActorSystem

    val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
    val throttler = Throttlers.getThrottler(priceDataActor)
    while(true) {
      throttler ! ProductTickerRestActor.StringData("test")
      Thread.sleep(1000)
    }

}

Throttler:

object Throttlers {


  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 1.second)
    .to(Sink.actorRef(priceDataActor, NotUsed))
    .run()
}

How to run the following code asynchronously instead of blocking using an infinite loop? :

throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000) 

Also, the throttler, in this case, maybe redundant as I'm throttling requests within the loop regardless.


Solution

  • I would just use Akka Streams for this along with Akka HTTP. Using Akka 2.6.x, something along these lines would be sufficient for 1 request/second

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import akka.stream.scaladsl._
    
    import scala.concurrent.duration._
    
    object HTTPRepeatedly {
      implicit val system = ActorSystem()
      import system.dispatcher
    
      val sourceFromHttp: Source[String, NotUsed] =
        Source.repeated("test") // Not sure what "test" is actually used for here...
          .throttle(1, 1.second)
          .map { str =>
            HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
          }.mapAsync(1) { req =>
            Http().singleRequest(req)
          }.mapAsync(1)(_.entity.toStrict(1.minute))
          .map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
    }
    

    Then you could, for instance (for simplicity, put this in a main within HTTPRepeatedly so the implicits are in scope etc.)

    val done: Future[Done] =
      sourceFromHttp
        .take(10) // stop after 10 requests
        .runWith(Sink.foreach { rData => println(s"rData: $rData") })
    
    scala.concurrent.Await.result(done, 11.minute)
    
    system.terminate()