Search code examples
scalarx-scala

How do I map an Observable with a function from Future to Future?


Suppose I have an event stream of elements with type In:

val observableIn: Observable[In] = ???

And a function for transforming objects of type In into objects of type Out, but "in the future":

val futureInToFutureOut: (Future[In]) => Future[Out] = ???

At this point I want to transform the elements of my observableIn according to my function futureInToFutureOut. That is, I want as my result an event stream of elements of type Out, matching the elements of the original stream, but converted via function futureInToFutureOut.

I think this should work:

val observableOut: Observable[Out] = observableIn flatMap { in =>
  Observable.from(futureInToFutureOut(Future(in)))
}

Is this right? Is there a better way to do this?


Solution

  • EDIT:

    Your solution is correct as far as I can tell. If you want to improve performance a bit, consider:

    val observableOut: Observable[Out] = observableIn.flatMap { in =>
      val p = Promise.successful(in)
      Observable.from(futureInToFutureOut(p.future))
    }
    

    This is a bit faster, as it does not create an asynchronous computation to map the future like Future.apply does.

    OLD:

    I am leaving my old suggestion below, which works only in the case that you are mapping a single event in the Observable.

    import scala.concurrent._
    val promiseIn = Promise[Int]()
    observableIn.foreach(x => promiseIn.trySuccess(x))
    val observableOut = Observable.create { observer =>
      promiseIn.future.map(futureInToFutureOut).foreach { y =>
        observer.onNext(y)
        observer.onCompleted()
      }
    }
    

    Explanation

    Since you are starting out with an Observable[In] object (i.e. an event stream), you need to find a way to transfer an event from that Observable to a future. The typical way to create a new future is to create its Promise first -- the input side of the future object. You then use foreach on the Observable to invoke trySuccess on the future when the first event arrives:

    observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
    

    Once the event on the Observable arrives, the promise will be asynchronously completed. We can now get the reading side of the promise, i.e. the future by calling its future method; and then call map on the future -- promiseIn.future.map(futureInToFutureOut). Graphically:

    promiseIn.future ---x---> map ---f(x)---> futureOut
    

    The resulting future is asynchronously completed with futureInToFutureOut(x). At this point we need to find a way to emit this value back through an Observable[Out]. A typical way to create new Observable is to call the Observable.create factory method. This method gives as the Observable's writing end -- the Observer, which we use to emit events to by calling onNext:

    futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
    

    Since we know that a future emits at most a single event, we call the onCompleted on the observer, to close the output Observable.

    Edit: if you want to master Rx and Scala futures, you may want to consider this book, which deals with these topics. Disclaimer: I'm the author.