Search code examples
scalahttpparallel-processingzio

scala ZIO foreachPar


I'm new to parallel programming and ZIO, i'm trying to get data from an API, by parallel requests.

import sttp.client._
import zio.{Task, ZIO}


ZIO.foreach(files) { file =>
    getData(file)
    Task(file.getName)
  }


def getData(file: File) = {

  val data: String = readData(file)
  val request = basicRequest.body(data).post(uri"$url")
      .headers(content -> "text", char -> "utf-8")
      .response(asString)

  implicit val backend: SttpBackend[Identity, Nothing, NothingT] = HttpURLConnectionBackend()
  request.send().body

  resquest.Response match {
    case Success(value) => {
        val src = new PrintWriter(new File(filename))
        src.write(value.toString)
        src.close()
      }
    case Failure(exception) => log error
  }

when i execute the program sequentially, it work as expected, if i tried to run parallel, by changing ZIO.foreach to ZIO.foreachPar. The program is terminating prematurely, i get that, i'm missing something basic here, any help is appreciated to help me figure out the issue.


Solution

  • Generally speaking I wouldn't recommend mixing synchronous blocking code as you have with asynchronous non-blocking code which is the primary role of ZIO. There are some great talks out there on how to effectively use ZIO with the "world" so to speak.

    There are two key points I would make, one ZIO lets you manage resources effectively by attaching allocation and finalization steps and two, "effects" we could say are "things which actually interact with the world" should be wrapped in the tightest scope possible*.

    So lets go through this example a bit, first of all, I would not suggest using the default Identity backed backend with ZIO, I would recommend using the AsyncHttpClientZioBackend instead.

    import sttp.client._
    import zio.{Task, ZIO}
    import zio.blocking.effectBlocking
    import sttp.client.asynchttpclient.zio.AsyncHttpClientZioBackend
    
    // Extract the common elements of the request
    val baseRequest = basicRequest.post(uri"$url")
          .headers(content -> "text", char -> "utf-8")
          .response(asString)
    
    // Produces a writer which is wrapped in a `Managed` allowing it to be properly
    // closed after being used
    def managedWriter(filename: String): Managed[IOException, PrintWriter] =
      ZManaged.fromAutoCloseable(UIO(new PrintWriter(new File(filename))))
    
    
    // This returns an effect which produces an `SttpBackend`, thus we flatMap over it
    // to extract the backend.
    val program = AsyncHttpClientZioBackend().flatMap { implicit backend => 
      ZIO.foreachPar(files) { file =>
        for {
          // Wrap the synchronous reading of data in a `Task`, but which allows runs this effect on a "blocking" threadpool instead of blocking the main one.
          data <- effectBlocking(readData(file))
          // `send` will return a `Task` because it is using the implicit backend in scope
          resp <- baseRequest.body(data).send()
          // Build the managed writer, then "use" it to produce an effect, at the end of `use` it will automatically close the writer.
          _    <- managedWriter("").use(w => Task(w.write(resp.body.toString))) 
        } yield ()
      }
    }
    

    At this point you will just have the program which you will need to run using one of the unsafe methods or if you are using a zio.App through the main method.

    * Not always possible or convenient, but it is useful because it prevents resource hogging by yielding tasks back to the runtime for scheduling.