Search code examples
scalamonixsttp

How can I run parSequenceUnordered of Monix, and handle the results of each task?


I am currently working on implementing client-side http requests to an API, and decided to explore sttp & monix for this task. As I am new to Monix, I am still not sure how to run tasks and retrieve their results. My objective is to have a sequence of http request results, which I can call in parallel -> parse -> load.

Below is a snippet of what I have tried so far:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val tasks = Seq(r1).map(i => Task(i))
    Task.parSequenceUnordered(tasks).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

  postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}

My confusion is rather a simple one (I am guessing). How can I run the Task.parSequenceUnordered that I have created, and handle (parse the http results) the Tasks within the sequence?

Nice to have: out of curiosity, is it possible to naively introduce rate-limiting/throttling when processing the Task sequence of requests? I am not really looking for building something sophisticated. It could be as simple as spacing out batches of requests. Wondering if Monix has a helper for that already.


Solution

  • Thanks to Oleg Pyzhcov and the monix gitter community for helping me figure this one out.

    Quoting Oleg here:

    Since you're using backend with monix support already, the type of r1 is Task[Response[Either[String,String]]]. So when you're doing Seq(r1).map(i => Task(i)), you make it a sequence of tasks that don't do anything except give you other tasks that give you result (the type would be Seq[Task[Task[Response[...]]]]). Your code then parallelizes the outer layer, tasks-that-give-tasks, and you get the tasks that you started with as the result. You only need to process a Seq(r1) for it to run requests in parallel.

    If you're using Intellij, you can press Alt + = to see the type of selection - it helps if you can't tell the type from the code alone (but it gets better with experience).

    As for rate-limiting, we have parSequenceN that lets you set a limit to parallelism. Note that unordered only means that you get slight performance advantage at the cost of results being in random order in the output, they are executed non-deterministically anyway.

    I ended up with a (simplified) implementation that looks something like this:

    import sttp.client._
    import sttp.client.asynchttpclient.monix._
    import monix.eval.Task
    
    object SO extends App {
    
      val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
        val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
          .header("accept", "application/json")
          .response(asString)
          .body()
          .send()
    
        val items = Seq(r1.map(x => x.body))
        Task.parSequenceN(1)(items).guarantee(backend.close())
      }
      
      import monix.execution.Scheduler.Implicits.global
    
       postTask.runToFuture.foreach(println)
    }