Search code examples
rx-swift

Need to sleep for TestScheduler to finish


Summary

I create an Observable that emits events by calling onNext from a DispatchQueue.async, and my corresponding unit test needs to sleep(...) in order to actually receive the events, even though I use the TestScheduler.

Detailed question

I create the following Observable, which reads events from a gRPC stream. I believe that gRPC is not so important here: just note that I call.receive() in a while loop, and feed that to onNext.

private func createPositionObservable() -> Observable<Position> {
    return Observable.create { observer in
        let request = DronecodeSdk_Rpc_Telemetry_SubscribePositionRequest()

        do {
            let call = try self.service.subscribePosition(request, completion: { (callResult) in
                if callResult.statusCode == .ok || callResult.statusCode == .cancelled {
                    observer.onCompleted()
                } else {
                    observer.onError(RuntimeTelemetryError(callResult.statusMessage!))
                }   
            })  

            DispatchQueue.init(label: "DronecodePositionReceiver").async {
                while let responseOptional = try? call.receive(), let response = responseOptional {
                    observer.onNext(Position.translateFromRpc(response.position))
                }   
            }   

            return Disposables.create {
                call.cancel()
            }   
        } catch {
            observer.onError(error)
            return Disposables.create()
        }   
    }   
    .subscribeOn(scheduler)
}

I now try to test this code with the function below. Again, the first paragraph is only about setting the gRPC context. What I believe is important is that:

  1. I use a TestScheduler
  2. I pass this scheduler to gRPC (scheduler is passed to Telemetry, and is used as subscribeOn(scheduler) above)
  3. I sleep(2) before the asserts
func checkPositionObservableReceivesEvents(positions: [DronecodeSdk_Rpc_Telemetry_Position]) {
    let fakeService = DronecodeSdk_Rpc_Telemetry_TelemetryServiceServiceTestStub()
    let fakeCall = DronecodeSdk_Rpc_Telemetry_TelemetryServiceSubscribePositionCallTestStub()
    fakeCall.outputs.append(contentsOf: positions.map{ position in createPositionResponse(position: position) })
    fakeService.subscribePositionCalls.append(fakeCall)
    let expectedEvents = positions.map{ position in next(1, translateRPCPosition(positionRPC: position)) }

    let scheduler = TestScheduler(initialClock: 0)
    let observer = scheduler.createObserver(Telemetry.Position.self)
    let telemetry = Telemetry(service: fakeService, scheduler: scheduler)

    let _ = telemetry.position.subscribe(observer)
    scheduler.start()

    sleep(2)

    XCTAssertEqual(expectedEvents.count, observer.events.count)
    XCTAssertTrue(observer.events.elementsEqual(expectedEvents, by: { (observed, expected) in
        observed.value == expected.value
    })) 
}

If I don't sleep(...), my asserts fail and observer.events.count receives no event. It feels like the assert happens before the events are emitted.

How should I deal with that?


Solution

  • You deal with that by not creating a DispatchQueue inside your create function. Instead the function needs to accept a Scheduler as a parameter and you use the scheduler to create the async block.

    Then in your test, you pass in the test scheduler.

    Something like this will do it:

    let disposable = self.scheduler.schedule(0, action: { _ in
        var cancel = false
        while let responseOptional = try? call.receive(), let response = responseOptional, cancel == false {
            observer.onNext(Position.translateFromRpc(response.position))
        }
        return Disposables.create { cancel = true }
    })
    
    return Disposables.create {
        disposable.dispose()
        call.cancel()
    }