Given a service class like this:
class Service {
let networkService = NetworkService()
func handleJobA(input: String) -> Observable<ResultA> {
return networkService
.computeA(input)
.map { $0.a }
}
}
And when I use it from the caller side like this:
let service = Service()
Observable
.from(["Hello", "World"])
.flatMap {
service.handleJobA($0)
}
.subscribe()
Then this would send multiple requests to service
at the same time. I wanted for the stream to wait until each request is done. That was achievable using the merge
operator.
Observable
.from(["Hello", "World"])
.flatMap {
Observable.just(
service.handleJobA($0)
)
}
.merge(maxConcurrent: 1)
.subscribe()
So far, so good - the service will not perform multiple handleJobA
tasks at the same time.
However, the concurrency is a service detail and the caller should NOT care about it. In fact, the service, at a later stage, might decide to allow for difference concurrency values.
Secondly, when I add a new method handleJobB
, it must not be active at the same time as job A, and vice versa.
So my question is:
You need a serial Scheduler that is dedicated to that service. Here is an example that can be pasted to a playground:
/// playground
import RxSwift
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job a")
sleep(3)
observer.onNext(input)
print("complete job a")
observer.onCompleted()
return Disposables.create()
}.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job b")
sleep(3)
observer.onNext(input)
print("complete job b")
observer.onCompleted()
return Disposables.create()
return Disposables.create()
}.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobA(input: $0) }
.subscribe(onNext:{
print("result " + $0)
})
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobB(input: $0) }
.subscribe(onNext:{
print("result " + $0)
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true