I've noticed that sometimes my combine pipeline is "stuck" when I use "DispatchQueue.global()" as a scheduler of "delay".
for i in 0..<100 {
let scheduler = DispatchQueue.global()
let timeoutPublisher = Just<Int>(0)
.delay(for: .seconds(0.1), scheduler: scheduler)
let anotherJust = Just(1)
var ok = false
self.cancellable = anotherJust
.merge(with: timeoutPublisher)
.sink { [weak self] state in
if state == 0 {
ok = true
}
}
try? await Task.wait(.seconds(1))
// ...
}
In this code snippet, sometimes the "ok" is never set to true. It only happens occasionally so I suspect it's some threading issue. However, when it gets into that invalid state, the value "0" is never published.
When I switch scheduler to "DispatchQueue.main", it always works.
Anyone could explain what am I missing here?
The problem has to do with the timing and when the pipelines publish their "finish" events.
I took your code and wrapped it up in a XCTestCase
class TestDelay : XCTestCase {
func testDelayOperator() {
print("running the test")
let scheduler = DispatchQueue.global()
let timeoutPublisher = Just(0)
.print("just 0")
.delay(for: .seconds(0.1), scheduler: scheduler)
let expectZero = expectation(description: "Expect 0 to be received.")
let cancellable = Just(1)
.merge(with: timeoutPublisher)
.print("just 1")
.sink { completion in
switch completion {
case .finished:
print("finished")
case .failure(_):
print("failure")
}
} receiveValue: { state in
print("State is \(state)")
if state == 0 {
expectZero.fulfill()
}
}
wait(for: [expectZero], timeout: 1.0)
}
}
And then I started a timer at intervals of 1 second and ran the test watching the output until the test failed:
var cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink { _ in
TestDelay.defaultTestSuite.run()
}
Here's the transcript from a successful test:
Test Suite 'TestDelay' started at 2024-01-29 15:01:17.261.
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' started.
running the test
just 1: receive subscription: (Merge)
just 1: request unlimited
just 1: receive value: (1)
State is 1
just 0: receive subscription: (Just)
just 0: request unlimited
just 0: receive value: (0)
just 0: receive finished
just 1: receive value: (0)
State is 0
just 1: receive finished
finished
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' passed (0.103 seconds).
Test Suite 'TestDelay' passed at 2024-01-29 15:01:17.364.
Executed 1 test, with 0 failures (0 unexpected) in 0.103 (0.103) seconds
You can see that the "just 1" pipeline received the value 0 before it receives the "just 1's" finished
event and the test succeeded.
Here's a example of a failed test:
est Suite 'TestDelay' started at 2024-01-29 15:01:12.263.
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' started.
running the test
just 1: receive subscription: (Merge)
just 1: request unlimited
just 1: receive value: (1)
State is 1
just 0: receive subscription: (Just)
just 0: request unlimited
just 0: receive value: (0)
just 0: receive finished
just 1: receive finished
finished
<unknown>:0: error: -[__lldb_expr_96.TestDelay testDelayOperator] : Asynchronous wait failed: Exceeded timeout of 1 seconds, with unfulfilled expectations: "Expect 0 to be received.".
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' failed (1.021 seconds).
Test Suite 'TestDelay' failed at 2024-01-29 15:01:13.287.
Executed 1 test, with 1 failure (0 unexpected) in 1.021 (1.024) seconds
Notice that both finished
events arrived before the just 1 pipeline received a value of 0.
Since the global dispatch queues are not serial queues. I suspect that the delay
operator is creating separate, parallel blocks for the "publish a 0" event and the "publish a finish" event. Your test succeeds or fails depending on which of those gets delivered first. When you use the main queue, that is a serial queue and the problem goes away. Try using a serial queue that you create yourself as the scheduler for your delay
and see if that resolves your problem.