Search code examples
swiftreactive-programmingcombine

How to apply back pressure with Combine buffer operator to avoid flatMap to ask an infinite demand upstream?


I'm trying to use Combine to do several millions concurrent request through the network. Here is a mock up of the naive approach I'n using:

import Foundation
import Combine

let cancellable = (0..<1_000_000).publisher
  .map(some_preprocessing)
  .flatMap(maxPublishers: .max(32)) { request in
    URLSession.dataTaskPublisher(for: request)
      .map(\.data)
      .catch { _ in
        return Just(Data())
      }
  }
  .sink { completion in
    print(completion)
  } receiveValue: { value in
    print(value)
  }

// Required in a command line tool
sleep(100)

This pipeline first creates a request, the the request is done in flatMap to confine errors. Also, flatMap merges several requests to they are effectively done concurrently, which is great.

The issue is that it will literally make 1,000,000 requests concurrently, so I added the parameter maxPublishers which limits the number of publishers that are subscribed at the same time in flatMap. This kind of work, only 32 publishers are active at the same time, but unfortunately some_preprocessing will still be performed 1,000,000 times before flatMap will be executed.

I expected flatMap(maxPublishers: .max(32)) to apply some back pressure, i.e. only requesting items from the upstream publisher map when maxPublishers < 32. This does not seem to be the case, and it fills up the RAM rapidly and delays the processing.

I then tried to use the buffer operator that is used to introduce back pressure between a producer and a consumer, but Apple documentation is so poor I don't understand its functioning (more specifically the prefechStrategy argument).

So I tried different combinations such as:

import Foundation
import Combine

let cancellable = (0..<1_000_000).publisher
  .map(some_preprocessing)
  .buffer(size: 32, prefetch: .byRequest, whenFull: .dropNewest)
  .flatMap(maxPublishers: .max(32)) { request in
    URLSession.dataTaskPublisher(for: request)
      .map(\.data)
      .catch { _ in
        return Just(Data())
      }
  }
  .sink { completion in
    print(completion)
  } receiveValue: { value in
    print(value)
  }

// Required in a command line tool
sleep(100)

This does not seem to do anything useful though, flatMap still requests as much element as it can.

How to properly apply back pressure in this case? I.e I need the upstream map publisher to "wait" for demand asked by the downstream publisher flatMap, which should only ask items when it as an empty slot.


Solution

  • The issue appears to be a Combine bug, as pointed out here. Using Publishers.Sequence causes the following operator to accumulate every value sent downstream before proceeding.

    A workaround is to type-erase the sequence publisher:

    import Foundation
    import Combine
    
    let cancellable = (0..<1_000_000).publisher
      .eraseToAnyPublisher()  // <----
      .map(some_preprocessing)
      .flatMap(maxPublishers: .max(32)) { request in
        URLSession.dataTaskPublisher(for: request)
          .map(\.data)
          .catch { _ in
            return Just(Data())
          }
      }
      .sink { completion in
        print(completion)
      } receiveValue: { value in
        print(value)
      }
    
    // Required in a command line tool without running loop
    sleep(.max)