I have to fetch three types of data (AType
, BType
, CType
) using three separate API requests. The objects returned by the APIs are related by one-to-many:
AType
object is parent of N BType
objectsBType
object is parent of P CType
objects)I'm using the following three functions to fetch each type:
func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}
and to avoid nested subscriptions, these three functions are chained using flatMap
:
func getAll() -> Observable<CType> {
return self.get_A_objects()
.flatMap { (aa:AType) in return get_B_objects(aa.id) }
.flatMap { (bb:BType) in return get_C_objects(bb.id) }
}
func setup() {
self.getAll().subscribeNext { _ in
print ("One more item fetched")
}
}
The above code works fine, when there are M objects of AType
, I could see the text "One more item fetched"
printed MxNxP times.
I'd like to setup the getAll()
function to deliver status updates throughout the chain using ReplaySubject<String>
. My initial thought is to write something like:
func getAll() -> ReplaySubject<String> {
let msg = ReplaySubject<String>.createUnbounded()
self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
.flatMap { (aa:AType) in
return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
}
.flatMap { (bb:BType) in
return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
}
return msg
}
but this attempt failed, i.e., the following print()
does not print anything.
getAll().subscribeNext {
print ($0)
}
How should I rewrite my logic?
It's because you're not retaining your Disposable
s, so they're being deallocated immediately, and thus do nothing.
In getAll
, you create an Observable<AType>
via get_A_objects()
, yet it is not added to a DisposeBag
. When it goes out of scope (at the end of the func
), it will be deallocated. So { aobj in msg.onNext ("Fetching A \(aobj)") }
will never happen (or at least isn't likely to, if it's async).
Also, you aren't retaining the ReplaySubject<String>
returned from getAll().subscribeNext
either. So for the same reason, this would also be a deal-breaker.
Since you want two Observable
s: one for the actual final results (Observable<CType>
), and one for the progress status (ReplaySubject<String>
), you should return both from your getAll()
function, so that both can be "owned", and their lifetime managed.
func getAll() -> (Observable<CType>, ReplaySubject<String>) {
let progress = ReplaySubject<String>.createUnbounded()
let results = self.get_A_objects()......
return (results, progress)
}
let (results, progress) = getAll()
progress
.subscribeNext {
print ($0)
}
.addDisposableTo(disposeBag)
results
.subscribeNext {
print ($0)
}
.addDisposableTo(disposeBag)
Some notes:
createUnbounded
, which could be dangerous if you aren't careful.ReplaySubject
at all, since it would be a lie to say that you're "fetching" something later if someone subscribes after, and gets an old progress status message. Consider using PublishSubject
.progress
before results
to be sure that you don't miss any progress status messages, since the output won't be buffered anymore.