I was playing with RxSwift on playground and I'm confronted to a warning. Here is the complete warning message :
Synchronization anomaly was detected.
- Debugging: To debug this issue you can set a breakpoint in RxSwift/RxSwift/Rx.swift:113 and observe the call stack.
Problem: This behavior is breaking the observable sequence grammar. `next (error | completed)?`
- This behavior breaks the grammar because there is overlapping between sequence events.
Observable sequence is trying to send an event before sending of previous event has finished.
- Interpretation: Two different unsynchronized threads are trying to send some event simultaneously.
This is undefined behavior because the ordering of the effects caused by these events is nondeterministic and depends on the
operating system thread scheduler. This will result in a random behavior of your program.
- Remedy: If this is the expected behavior this message can be suppressed by adding `.observeOn(MainScheduler.asyncInstance)`
or by synchronizing sequence events in some other way.
Here is the code on Playground.
import RxSwift
import Foundation
example("PublishSubject") {
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject.onNext("🐶")
subject.subscribe(onNext: { (value) in
print(value)
}).disposed(by: disposeBag)
subject.onNext("🐱")
subject.onNext("🅰️")
subject.onNext("🅱️")
DispatchQueue.global(qos: .utility).async {
for index in 0...10 {
subject.onNext("1")
}
subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
print(value)
}).disposed(by: disposeBag)
}
DispatchQueue.global(qos: .utility).async {
for index in 0...10 {
subject.onNext("B")
}
subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
print(value)
}).disposed(by: disposeBag)
}
}
How can I do to solve this warning problem?? thx
You are sending an event on your subject while it is in the middle of processing an event. That breaks the contract that subjects must maintain.
Specifically, subjects do not have any sort of thread tracking in them so you have to do it manually. The most obvious way would be to put a Recursive Lock around your onNext
calls so they don't overlap when being run on separate threads.
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
let lock = NSRecursiveLock()
subject.onNext("🐶")
subject.subscribe(onNext: { (value) in
print(value)
}).disposed(by: disposeBag)
subject.onNext("🐱")
subject.onNext("🅰️")
subject.onNext("🅱️")
DispatchQueue.global(qos: .utility).async {
for index in 0...10 {
lock.lock()
subject.onNext("1")
lock.unlock()
}
subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
print(value)
}).disposed(by: disposeBag)
}
DispatchQueue.global(qos: .utility).async {
for index in 0...10 {
lock.lock()
subject.onNext("B")
lock.unlock()
}
subject.observeOn(MainScheduler.asyncInstance).subscribe(onNext: { (value) in
print(value)
}).disposed(by: disposeBag)
}