Search code examples
scalamonix

How can I block terminating my program until the Observable consumption is complete?


I am currently trying to use Monix for throttling api get requests. I have tried using STTP's Monix backend, and it worked fine until couldn't shut down the Monix backend after I was done... As this seems more like an sttp issue than a Monix one, I tried to re-approach the problem by using sttp's default backend, while still using Monix to throttle.

I am mainly struggling with closing the monix backend once I am done with consuming the observable

I have tried to simplify the problem through:

  import monix.execution.Scheduler.Implicits.global

  val someIter = List(Task(1), Task(2))

  val obs: Observable[CancelableFuture[Int]] = Observable
    .fromIterable(someIter)
    .throttle(3.second, 1)
    .map(_.runToFuture)

However, I am still not sure how to turn off the program after the Observable is consumed, as it terminates prematurely here (unlike the monix backend case)...

In other words, how can I block terminating program until the Observable iterable is complete?


Solution

  • You can create Promise, complete it when Observable is completed by .doOnComplete

    And await it in the main thread.

    import monix.execution.Scheduler.Implicits.global
    
    val someIter = List(Task(1), Task(2))
    val promise = Promise()
    val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
      .map(_.runToFuture)
      .doOnComplete(Task { promise.complete(Success()) })
    
    Await.ready(promise.future, Duration.Inf)