Search code examples
observableswift4grand-central-dispatchrx-swift

How do I kick off a chain of observable on a particular queue?


My understanding is the SubscribeOn tells Rx what queue the sequence should begin on and that observeOn switches the scheduler.

This doesn't seem to be the case and it seems I'm lacking some basic understanding.

Please consider the example below.

override func viewDidLoad() {
        super.viewDidLoad()
        // Do any additional setup after loading the view.
        operation1().observeOn(realmReadScheduler)
            .observeOn(realmWriteScheduler)
            .flatMap(self.operation2)
            .observeOn(realmReadScheduler)
            .flatMap(operation3)
            .observeOn(realmSignalScheduler)
            .flatMap(operation4)
            .observeOn(realmConvertScheduler)
            .flatMap(operation5).subscribeOn(realmConvertScheduler)
            .subscribe(onNext: {success in
                print(success)
            })


    }

Which prints out:

> Operation 1 com.apple.main-thread Operation 2 com.jci.xaap.realm.write
> Operation 3 com.jci.xaap.realm.read Operation 4
> rxswift.queue.DispatchQoS(qosClass:
> Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
> Operation 5 com.jci.xaap.realm.convert

I had expected Operation1 to execute on 'realmConvertScheduler' not main thread..

How can I make sure that the very first observable in my chain executes on the queue that I want?


Solution

  • I suspect your operaton1() isn't written correctly. Note that the function is called on the main thread. The work that the Observable does is performed on whatever thread the subscribeOn tells it to use (or the thread that subscribe is called on if no subscribeOn exists.

    Here is an example:

    let schedulerA = ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "schedulerA"))
    
    func operation1() -> Observable<Void> {
        return Observable.create { observer in
            print("do your work on:", Thread.current)
            observer.onNext(())
            observer.onCompleted()
            return Disposables.create()
        }
    }
    
    operation1()
        .observeOn(MainScheduler.instance)
        .subscribeOn(schedulerA)
        .subscribe(onNext: {
            print("subscribe(onNext:) on:", Thread.current)
        })
    

    Another option would be to write it this way:

    func operation1() -> Observable<Void> {
        return Observable.deferred {
            print("do your work on:", Thread.current)
            return .just(())
        }
    }