Search code examples
swiftreactive-programmingrx-swiftbehaviorsubject

RxSwift: Nested Queries and ReplaySubject


I have to fetch three types of data (AType, BType, CType) using three separate API requests. The objects returned by the APIs are related by one-to-many:

  • 1 AType object is parent of N BType objects
  • 1 BType object is parent of P CType objects)

I'm using the following three functions to fetch each type:

func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}

and to avoid nested subscriptions, these three functions are chained using flatMap:

func getAll() -> Observable<CType> {
  return self.get_A_objects()
     .flatMap { (aa:AType) in  return get_B_objects(aa.id) }
     .flatMap { (bb:BType) in  return get_C_objects(bb.id) }
}

func setup() {
  self.getAll().subscribeNext { _ in
    print ("One more item fetched") 
  }
}

The above code works fine, when there are M objects of AType, I could see the text "One more item fetched" printed MxNxP times.

I'd like to setup the getAll() function to deliver status updates throughout the chain using ReplaySubject<String>. My initial thought is to write something like:

func getAll() -> ReplaySubject<String> {
  let msg = ReplaySubject<String>.createUnbounded()
  self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
    .flatMap { (aa:AType) in 
       return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
    }
    .flatMap { (bb:BType) in
       return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
    }

  return msg
}

but this attempt failed, i.e., the following print() does not print anything.

getAll().subscribeNext {
  print ($0)
}

How should I rewrite my logic?


Solution

  • Problem

    It's because you're not retaining your Disposables, so they're being deallocated immediately, and thus do nothing.

    In getAll, you create an Observable<AType> via get_A_objects(), yet it is not added to a DisposeBag. When it goes out of scope (at the end of the func), it will be deallocated. So { aobj in msg.onNext ("Fetching A \(aobj)") } will never happen (or at least isn't likely to, if it's async).

    Also, you aren't retaining the ReplaySubject<String> returned from getAll().subscribeNext either. So for the same reason, this would also be a deal-breaker.

    Solution

    Since you want two Observables: one for the actual final results (Observable<CType>), and one for the progress status (ReplaySubject<String>), you should return both from your getAll() function, so that both can be "owned", and their lifetime managed.

    func getAll() -> (Observable<CType>, ReplaySubject<String>) {
        let progress = ReplaySubject<String>.createUnbounded()
        let results = self.get_A_objects()......
        return (results, progress)
    }
    
    let (results, progress) = getAll()
    
    progress
        .subscribeNext {
            print ($0)
        }
        .addDisposableTo(disposeBag)
    
    results
        .subscribeNext {
            print ($0)
        }
        .addDisposableTo(disposeBag)
    

    Some notes:

    • You shouldn't need to use createUnbounded, which could be dangerous if you aren't careful.
    • You probably don't really want to use ReplaySubject at all, since it would be a lie to say that you're "fetching" something later if someone subscribes after, and gets an old progress status message. Consider using PublishSubject.
    • If you follow the above recommendation, then you just need to make sure that you subscribe to progress before results to be sure that you don't miss any progress status messages, since the output won't be buffered anymore.
    • Also, just my opinion, but I would re-word "Fetching X Y" to something else, since you aren't "fetching", but you have already "fetched" it.