So I have some RxSwift code where I want to perform a chain of asynchronous operations, all composed using observables. flatMap
is the way to do this, and it works great, however it doesn't seem to be able to pass variables down the chain that I can figure out. This is best illustrated by some pseudocode
Assume have 3 functions
class Connection {
static func establish(address:String) -> Observable<Connection>
func sendData(data:String) -> Observable<Int> // num bytes written or something
func close() -> Observable<Void>
}
And I want to call them in a chain such that we connect, send, then close. Something like this
Connection.establish(host)
.flatMap{ connection in connection.sendData("foo") }
.flatMap{ numBytes in ????.close() }
.subscribeNext{ /* all done */ }
The problem is that flatMap
doesn't pass it's input parameters down the chain, so that the closure passed to subscribeNext
doesn't have access to the connection
object, and as such it can't call close.
I could do some awful hack like the following, but I'd really rather not!
var connection:Connection?
Connection.establish(host)
.flatMap{ c in
connection = c
return c.sendData("foo")
}
.flatMap{ numBytes in connection!.close() }
.subscribeNext{ /* all done */ }
In the C# version of Rx, this is solved by using an overload to SelectMany
which takes a second closure, which combines the 2 values (usually into a Tuple) and then that thing is propagated down the chain. I've written this as an extension for RxSwfit, and it works as follows:
Connection.establish(host)
.flatMap(
{ connection in connection.sendData("foo") },
combine: { ($0, $1) }) // tupleify
.flatMap{ (connection, numbytes) in connection.close() }
.subscribeNext{ /* all done */ }
This is all well and good, but my primary question is - Is there a better way to do this which is built into RxSwift as it currently stands?
Additionally, writing this extension method is not simple nor easy. I basically re-implemented FlatMap from scratch by copy/pasting the one in MiniRxSwift and modifying it. If we have to write this extension, is there a better way to implement it using RxSwift constructs?
There are two ways to do what you want "using RxSwift constructs."
Connection.establish(host)
.flatMap { Observable.combineLatest(Observable.just($0), $0.sendData("foo")) }
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
or if you don't mind inserting into a map you could:
Connection.establish(host)
.flatMap { connection in
connection.sendData("foo").map { (connection, $0) }
}
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
Note that combineLatest
and map
were both in the library from the beginning.