I create an Observable that emits events by calling onNext
from a DispatchQueue.async
, and my corresponding unit test needs to sleep(...)
in order to actually receive the events, even though I use the TestScheduler
.
I create the following Observable
, which reads events from a gRPC stream. I believe that gRPC is not so important here: just note that I call.receive()
in a while
loop, and feed that to onNext
.
private func createPositionObservable() -> Observable<Position> {
return Observable.create { observer in
let request = DronecodeSdk_Rpc_Telemetry_SubscribePositionRequest()
do {
let call = try self.service.subscribePosition(request, completion: { (callResult) in
if callResult.statusCode == .ok || callResult.statusCode == .cancelled {
observer.onCompleted()
} else {
observer.onError(RuntimeTelemetryError(callResult.statusMessage!))
}
})
DispatchQueue.init(label: "DronecodePositionReceiver").async {
while let responseOptional = try? call.receive(), let response = responseOptional {
observer.onNext(Position.translateFromRpc(response.position))
}
}
return Disposables.create {
call.cancel()
}
} catch {
observer.onError(error)
return Disposables.create()
}
}
.subscribeOn(scheduler)
}
I now try to test this code with the function below. Again, the first paragraph is only about setting the gRPC context. What I believe is important is that:
TestScheduler
scheduler
is passed to Telemetry
, and is used as subscribeOn(scheduler)
above)sleep(2)
before the assertsfunc checkPositionObservableReceivesEvents(positions: [DronecodeSdk_Rpc_Telemetry_Position]) {
let fakeService = DronecodeSdk_Rpc_Telemetry_TelemetryServiceServiceTestStub()
let fakeCall = DronecodeSdk_Rpc_Telemetry_TelemetryServiceSubscribePositionCallTestStub()
fakeCall.outputs.append(contentsOf: positions.map{ position in createPositionResponse(position: position) })
fakeService.subscribePositionCalls.append(fakeCall)
let expectedEvents = positions.map{ position in next(1, translateRPCPosition(positionRPC: position)) }
let scheduler = TestScheduler(initialClock: 0)
let observer = scheduler.createObserver(Telemetry.Position.self)
let telemetry = Telemetry(service: fakeService, scheduler: scheduler)
let _ = telemetry.position.subscribe(observer)
scheduler.start()
sleep(2)
XCTAssertEqual(expectedEvents.count, observer.events.count)
XCTAssertTrue(observer.events.elementsEqual(expectedEvents, by: { (observed, expected) in
observed.value == expected.value
}))
}
If I don't sleep(...)
, my asserts fail and observer.events.count
receives no event. It feels like the assert happens before the events are emitted.
How should I deal with that?
You deal with that by not creating a DispatchQueue inside your create
function. Instead the function needs to accept a Scheduler as a parameter and you use the scheduler to create the async block.
Then in your test, you pass in the test scheduler.
Something like this will do it:
let disposable = self.scheduler.schedule(0, action: { _ in
var cancel = false
while let responseOptional = try? call.receive(), let response = responseOptional, cancel == false {
observer.onNext(Position.translateFromRpc(response.position))
}
return Disposables.create { cancel = true }
})
return Disposables.create {
disposable.dispose()
call.cancel()
}