Suppose I have an event stream of elements with type In
:
val observableIn: Observable[In] = ???
And a function for transforming objects of type In
into objects of type Out
, but "in the future":
val futureInToFutureOut: (Future[In]) => Future[Out] = ???
At this point I want to transform the elements of my observableIn
according to my function futureInToFutureOut
. That is, I want as my result an event stream of elements of type Out
, matching the elements of the original stream, but converted via function futureInToFutureOut
.
I think this should work:
val observableOut: Observable[Out] = observableIn flatMap { in =>
Observable.from(futureInToFutureOut(Future(in)))
}
Is this right? Is there a better way to do this?
EDIT:
Your solution is correct as far as I can tell. If you want to improve performance a bit, consider:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
This is a bit faster, as it does not create an asynchronous computation to map the future like Future.apply
does.
OLD:
I am leaving my old suggestion below, which works only in the case that you are mapping a single event in the Observable
.
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
Since you are starting out with an Observable[In]
object (i.e. an event stream), you need to find a way to transfer an event from that Observable
to a future. The typical way to create a new future is to create its Promise
first -- the input side of the future object. You then use foreach
on the Observable
to invoke trySuccess
on the future when the first event arrives:
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
Once the event on the Observable
arrives, the promise will be asynchronously completed. We can now get the reading side of the promise, i.e. the future by calling its future
method; and then call map
on the future -- promiseIn.future.map(futureInToFutureOut)
. Graphically:
promiseIn.future ---x---> map ---f(x)---> futureOut
The resulting future is asynchronously completed with futureInToFutureOut(x)
. At this point we need to find a way to emit this value back through an Observable[Out]
. A typical way to create new Observable
is to call the Observable.create
factory method. This method gives as the Observable
's writing end -- the Observer
, which we use to emit events to by calling onNext
:
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
Since we know that a future emits at most a single event, we call the onCompleted
on the observer, to close the output Observable
.
Edit: if you want to master Rx and Scala futures, you may want to consider this book, which deals with these topics. Disclaimer: I'm the author.