I'm trying to use Combine to do several millions concurrent request through the network. Here is a mock up of the naive approach I'n using:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool
sleep(100)
This pipeline first creates a request, the the request is done in flatMap
to confine errors. Also, flatMap
merges several requests to they are effectively done concurrently, which is great.
The issue is that it will literally make 1,000,000 requests concurrently, so I added the parameter maxPublishers
which limits the number of publishers that are subscribed at the same time in flatMap
. This kind of work, only 32 publishers are active at the same time, but unfortunately some_preprocessing
will still be performed 1,000,000 times before flatMap
will be executed.
I expected flatMap(maxPublishers: .max(32))
to apply some back pressure, i.e. only requesting items from the upstream publisher map
when maxPublishers
< 32. This does not seem to be the case, and it fills up the RAM rapidly and delays the processing.
I then tried to use the buffer
operator that is used to introduce back pressure between a producer and a consumer, but Apple documentation is so poor I don't understand its functioning (more specifically the prefechStrategy
argument).
So I tried different combinations such as:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.map(some_preprocessing)
.buffer(size: 32, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool
sleep(100)
This does not seem to do anything useful though, flatMap
still requests as much element as it can.
How to properly apply back pressure in this case? I.e I need the upstream map
publisher to "wait" for demand asked by the downstream publisher flatMap
, which should only ask items when it as an empty slot.
The issue appears to be a Combine bug, as pointed out here. Using Publishers.Sequence
causes the following operator to accumulate every value sent downstream before proceeding.
A workaround is to type-erase the sequence publisher:
import Foundation
import Combine
let cancellable = (0..<1_000_000).publisher
.eraseToAnyPublisher() // <----
.map(some_preprocessing)
.flatMap(maxPublishers: .max(32)) { request in
URLSession.dataTaskPublisher(for: request)
.map(\.data)
.catch { _ in
return Just(Data())
}
}
.sink { completion in
print(completion)
} receiveValue: { value in
print(value)
}
// Required in a command line tool without running loop
sleep(.max)