Search code examples
scalaconcurrencyparallel-processingfunctional-programmingrx-scala

Scala Observable Creation Blocks My Futures


I am wanting to process ea. query fetch (potentially multiple fetches per query) asynchronously. In order to do this, I pass the processing function (which returns a Future) to my query method to call it for ea. fetch. I don't know beforehand the result size of my query; I only know the max size of my fetch. Therefore, my query returns an Observable (as opposed to a List for ex. where I need to know the size beforehand). The only problem is that when I use Observable create or apply, it will internally block until my Future is completed before it calls the next onNext -- effectively, removing an performance gains I was hoping to get from the futures. The Observable from factory method does not block but it takes an Iterable. I can pass it a mutable Iterable and grow as new fetches come in. Somebody have a more referentially transparent sol'n? Here's the code:

object Repository {
  def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = {
    // observable (as opposed to list) because modeling a process 
    // where the total result size is unknown beforehand. 
    // Also, not creating or applying because it blocks the futures
    val mut = scala.collection.mutable.Set[Future[Set[Int]]]()
    val obs = Observable.from(mut)
    1 to 2100 by fetchSize foreach { i =>
      mut += f(DataSource.fetch(i, fetchSize))
    }
    obs
  }
}

Solution

  • I was able to remove mutability by using foldLeft:

    (1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) =>
      obs + f(DataSource.fetch(i)())
    }
    

    where:

    implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) {
      def +(future: Future[Set[Int]]) =
      obs merge (Observable just future)
    }
    

    The only thing is that I don't like what I had to do create an empty Observable that the compiler didn't gripe about. If anyone has a better answer, please post a it and I will mark it.