I am trying to make several API calls and populate a Realm Database.
Everything works fine. However when I try to run performSegue()
on subscribe()
method an exception is raised, informing that I can't do this on a background thread, which is perfectly reasonable.
But since I am subscribing to MainScheduler.instance
shouldn't the subscribe()
method run on UI Thread?
Single.zip(APIClient.shared.getSchools(), APIClient.shared.getPointsOfInterest())
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.flatMap { zip in return Single.zip(SchoolDao.shared.insertSchools(schoolsJson: zip.0), PointOfInterestDao.shared.insertPointsOfInterest(poisJson: zip.1))}
.flatMap{ _ in Single.zip(SchoolDao.shared.countSchools(), PointOfInterestDao.shared.countPointsOfInterest())}
.subscribeOn(MainScheduler.instance)
.subscribe(onSuccess: { tableCounts in
let (schoolsCount, poisCount) = tableCounts
if(schoolsCount != 0 && poisCount != 0){
print(Thread.isMainThread) //Prints False
self.performSegue(withIdentifier: "splashToLogin", sender: nil)
}
}, onError: {
error in return
}).disposed(by: disposeBag)
Am I making a wrong assumption on how does RXSwift works?
Edit: If I add this line .observeOn(MainScheduler.instance)
after .subscribeOn(MainScheduler.instance)
the subscribe method runs on Main thread. Is this correct behavior? What is .subscribeOn(MainScheduler.instance)
even doing?
Your edit explains all. Your initial assumption on what subscribeOn and observeOn were backwards.
The subscribeOn
operator refers to how the observable above the operator in the chain subscribes to the source of events (and likely doesn't do what you think it does in any case. Your two network calls likely set up their own background thread to emit values on regardless of how they are subscribed to.)
For example, look at this:
extension ObservableType {
func subscribeOnMain() -> Observable<Element> {
Observable.create { observer in
let disposable = SingleAssignmentDisposable()
DispatchQueue.main.async {
disposable.setDisposable(self.subscribe(observer))
}
return disposable
}
}
}
It makes it obvious why the operator is called subscribeOn. It's because the subscribe is happening on the scheduler/thread in question. And this helps you understand better what is happening when you stack subscribeOn operators...
The observeOn
operator refers to the scheduler that will be emitting elements to the observer (which is the block(s) of code that are passed to the subscribe
operator.)
Which would look like this:
extension ObservableType {
func observeOnMain() -> Observable<Element> {
Observable.create { observer in
self.subscribe { event in
DispatchQueue.main.async {
observer.on(event)
}
}
}
}
}
From this you can see that the subscribe is happening on the original scheduler, while the observer is being called on the new scheduler.
Here is a great article explaining the whole thing: http://rx-marin.com/post/observeon-vs-subscribeon/