Search code examples
iosswiftcombine

Combine: Using DispatchQueue.global() as a delay scheduler causes missing published values


I've noticed that sometimes my combine pipeline is "stuck" when I use "DispatchQueue.global()" as a scheduler of "delay".

for i in 0..<100 {
   let scheduler = DispatchQueue.global()

   let timeoutPublisher = Just<Int>(0)
       .delay(for: .seconds(0.1), scheduler: scheduler)

   let anotherJust = Just(1)
   var ok = false            
   
   self.cancellable = anotherJust
         .merge(with: timeoutPublisher)
         .sink { [weak self] state in                    
            if state == 0 {
               ok = true 
            }
         }
   
   try? await Task.wait(.seconds(1))
   // ...
}

In this code snippet, sometimes the "ok" is never set to true. It only happens occasionally so I suspect it's some threading issue. However, when it gets into that invalid state, the value "0" is never published.

When I switch scheduler to "DispatchQueue.main", it always works.

Anyone could explain what am I missing here?


Solution

  • The problem has to do with the timing and when the pipelines publish their "finish" events.

    I took your code and wrapped it up in a XCTestCase

    class TestDelay : XCTestCase {
      func testDelayOperator() {
        print("running the test")
        let scheduler = DispatchQueue.global()
    
        let timeoutPublisher = Just(0)
          .print("just 0")
          .delay(for: .seconds(0.1), scheduler: scheduler)
    
        let expectZero = expectation(description: "Expect 0 to be received.")
        let cancellable = Just(1)
          .merge(with: timeoutPublisher)
          .print("just 1")
          .sink { completion in
            switch completion {
            case .finished:
              print("finished")
            case .failure(_):
              print("failure")
            }
          } receiveValue: { state in
            print("State is \(state)")
            if state == 0 {
              expectZero.fulfill()
            }
          }
    
        wait(for: [expectZero], timeout:  1.0)
      }
    }
    

    And then I started a timer at intervals of 1 second and ran the test watching the output until the test failed:

    var cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .sink { _ in
        TestDelay.defaultTestSuite.run()
      }
    

    Here's the transcript from a successful test:

    Test Suite 'TestDelay' started at 2024-01-29 15:01:17.261.
    Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' started.
    running the test
    just 1: receive subscription: (Merge)
    just 1: request unlimited
    just 1: receive value: (1)
    State is 1
    just 0: receive subscription: (Just)
    just 0: request unlimited
    just 0: receive value: (0)
    just 0: receive finished
    just 1: receive value: (0)
    State is 0
    just 1: receive finished
    finished
    Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' passed (0.103 seconds).
    Test Suite 'TestDelay' passed at 2024-01-29 15:01:17.364.
         Executed 1 test, with 0 failures (0 unexpected) in 0.103 (0.103) seconds
    

    You can see that the "just 1" pipeline received the value 0 before it receives the "just 1's" finished event and the test succeeded.

    Here's a example of a failed test:

    est Suite 'TestDelay' started at 2024-01-29 15:01:12.263.
    Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' started.
    running the test
    just 1: receive subscription: (Merge)
    just 1: request unlimited
    just 1: receive value: (1)
    State is 1
    just 0: receive subscription: (Just)
    just 0: request unlimited
    just 0: receive value: (0)
    just 0: receive finished
    just 1: receive finished
    finished
    <unknown>:0: error: -[__lldb_expr_96.TestDelay testDelayOperator] : Asynchronous wait failed: Exceeded timeout of 1 seconds, with unfulfilled expectations: "Expect 0 to be received.".
    Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' failed (1.021 seconds).
    Test Suite 'TestDelay' failed at 2024-01-29 15:01:13.287.
         Executed 1 test, with 1 failure (0 unexpected) in 1.021 (1.024) seconds
    

    Notice that both finished events arrived before the just 1 pipeline received a value of 0.

    Since the global dispatch queues are not serial queues. I suspect that the delay operator is creating separate, parallel blocks for the "publish a 0" event and the "publish a finish" event. Your test succeeds or fails depending on which of those gets delivered first. When you use the main queue, that is a serial queue and the problem goes away. Try using a serial queue that you create yourself as the scheduler for your delay and see if that resolves your problem.