I've got a question about the zip
operator in Combine in combination with backpressure.
Take the following code snippet:
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
.print()
.sink { letters, digits in
print(letters, digits)
}
subject.send("a")
When executing this in the playground, the following is the output:
receive subscription: (0..<9223372036854775807)
receive subscription: (Zip)
request unlimited
request unlimited
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: (5)
receive value: (6)
receive value: (7)
...
When executing it on an iOS device, the code crashes after a few seconds because of memory issues.
The underlying reason can be seen in the fourth line above where zip
requests an unlimited amount of values from the sequencePublisher
. Since the sequencePublisher
provides the whole range of Int
values, this causes memory overflows.
What I think to know:
zip
waits for one value of each publisher before combining them and pushing them on My expectation would be that zip
only requests one value from each publisher, waits for them to arrive and only requests the next values when it received one from each.
In this particular case I tried to build a behaviour where a sequence number is assigned to every value that is produced by the subject
. However, I could imagine that this is always a problem when zip
combines values from publishers that publish with very different frequencies.
Utilising backpressure in the zip
operator seems to be perfect tool to solve that issue. Do you know why this isn't the case? Is this a bug or intentional? If intentional, why?
Thanks guys
It seems that the Sequence publisher is just unrealistic. It doesn't seem to respond to back pressure; it just spews out the whole sequence at once, which makes no sense in a world where publication is supposed to be asynchronous. If you change Int.max
to 3 there's no problem. :) I don't know whether it's a bug or just a flaw in the whole notion of a Sequence publisher.
However, there's really no issue for your actual use case, because there is a much better way to assign a successive number to each emission from the Subject, namely scan
.
Here's a more realistic approach:
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController : UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let subject = PassthroughSubject<String, Never>()
subject.scan(("",0)) {t,s in (s,t.1+1)}
.sink { print($0.0, $0.1)
}.store(in:&storage)
delay(1) {
subject.send("a") // a 1
delay(1) {
subject.send("b") // b 2
}
}
}
}
That assumes that you have some other reason for needing each successive enumeration to pass down thru the pipeline. But if your only goal is to enumerate each signal as it arrives into the sink
itself, you can just have the sink
itself maintain a counter (which it can easily do, because it is a closure):
var storage = Set<AnyCancellable>()
let subject = PassthroughSubject<String, Never>()
override func viewDidLoad() {
super.viewDidLoad()
var counter = 1
subject
.sink {print($0, counter); counter += 1}
.store(in:&storage)
delay(1) {
self.subject.send("a") // a 1
self.subject.send("b") // b 2
}
}