Search code examples
iosswiftrx-swift

RXSwift Not subscribing on Main Thread


I am trying to make several API calls and populate a Realm Database. Everything works fine. However when I try to run performSegue() on subscribe() method an exception is raised, informing that I can't do this on a background thread, which is perfectly reasonable.

But since I am subscribing to MainScheduler.instance shouldn't the subscribe() method run on UI Thread?

Single.zip(APIClient.shared.getSchools(), APIClient.shared.getPointsOfInterest())
        .observeOn(SerialDispatchQueueScheduler(qos: .background))
        .flatMap { zip in return Single.zip(SchoolDao.shared.insertSchools(schoolsJson: zip.0), PointOfInterestDao.shared.insertPointsOfInterest(poisJson: zip.1))}
        .flatMap{ _ in Single.zip(SchoolDao.shared.countSchools(), PointOfInterestDao.shared.countPointsOfInterest())}
        .subscribeOn(MainScheduler.instance)
        .subscribe(onSuccess: { tableCounts in
            let (schoolsCount, poisCount) = tableCounts
            if(schoolsCount != 0 && poisCount != 0){
                print(Thread.isMainThread) //Prints False
                self.performSegue(withIdentifier: "splashToLogin", sender: nil)
            }
        }, onError: {
            error in return
        }).disposed(by: disposeBag)

Am I making a wrong assumption on how does RXSwift works?

Edit: If I add this line .observeOn(MainScheduler.instance) after .subscribeOn(MainScheduler.instance) the subscribe method runs on Main thread. Is this correct behavior? What is .subscribeOn(MainScheduler.instance) even doing?


Solution

  • Your edit explains all. Your initial assumption on what subscribeOn and observeOn were backwards.

    The subscribeOn operator refers to how the observable above the operator in the chain subscribes to the source of events (and likely doesn't do what you think it does in any case. Your two network calls likely set up their own background thread to emit values on regardless of how they are subscribed to.)

    For example, look at this:

    extension ObservableType {
        func subscribeOnMain() -> Observable<Element> {
            Observable.create { observer in
                let disposable = SingleAssignmentDisposable()
                DispatchQueue.main.async {
                    disposable.setDisposable(self.subscribe(observer))
                }
                return disposable
            }
        }
    }
    

    It makes it obvious why the operator is called subscribeOn. It's because the subscribe is happening on the scheduler/thread in question. And this helps you understand better what is happening when you stack subscribeOn operators...

    The observeOn operator refers to the scheduler that will be emitting elements to the observer (which is the block(s) of code that are passed to the subscribe operator.)

    Which would look like this:

    extension ObservableType {
        func observeOnMain() -> Observable<Element> {
            Observable.create { observer in
                self.subscribe { event in
                    DispatchQueue.main.async {
                        observer.on(event)
                    }
                }
            }
        }
    }
    

    From this you can see that the subscribe is happening on the original scheduler, while the observer is being called on the new scheduler.

    Here is a great article explaining the whole thing: http://rx-marin.com/post/observeon-vs-subscribeon/