Search code examples
swiftreactive-programmingrx-swift

Limiting concurrent access to a service class with RxSwift


Given a service class like this:

class Service {
    let networkService = NetworkService()

    func handleJobA(input: String) -> Observable<ResultA> {
        return networkService
            .computeA(input)
            .map { $0.a }
    }
}

And when I use it from the caller side like this:

let service = Service()

Observable
    .from(["Hello", "World"])
    .flatMap {
        service.handleJobA($0)
    }
    .subscribe()

Then this would send multiple requests to service at the same time. I wanted for the stream to wait until each request is done. That was achievable using the merge operator.

Observable
    .from(["Hello", "World"])
    .flatMap {
        Observable.just(
            service.handleJobA($0)
        )
    }
    .merge(maxConcurrent: 1)
    .subscribe()

So far, so good - the service will not perform multiple handleJobA tasks at the same time.

However, the concurrency is a service detail and the caller should NOT care about it. In fact, the service, at a later stage, might decide to allow for difference concurrency values.

Secondly, when I add a new method handleJobB, it must not be active at the same time as job A, and vice versa.

So my question is:

  1. How can I restrict the maxConcurrency to the handleJobA observable as a implementation detail?
  2. Which RxSwift pattern would allow to restrict this for any service method?

Solution

  • You need a serial Scheduler that is dedicated to that service. Here is an example that can be pasted to a playground:

    /// playground
    
    import RxSwift
    
    class Service {
    
        func handleJobA(input: String) -> Observable<String> {
    
            return Observable.create { observer in
                print("start job a")
                sleep(3)
                observer.onNext(input)
                print("complete job a")
                observer.onCompleted()
                return Disposables.create()
            }.subscribeOn(scheduler)
        }
    
        func handleJobB(input: String) -> Observable<String> {
            return Observable.create { observer in
                print("start job b")
                sleep(3)
                observer.onNext(input)
                print("complete job b")
                observer.onCompleted()
                return Disposables.create()
                return Disposables.create()
            }.subscribeOn(scheduler)
        }
    
        let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
    }
    
    
    let service = Service()
    
    _ = Observable.from(["hello","world","swift"])
        .flatMap { service.handleJobA(input: $0) }
        .subscribe(onNext:{
            print("result " + $0)
        })
    
    _ = Observable.from(["hello","world","swift"])
        .flatMap { service.handleJobB(input: $0) }
        .subscribe(onNext:{
            print("result " + $0)
        })
    
    import PlaygroundSupport
    
    PlaygroundPage.current.needsIndefiniteExecution = true