Search code examples
iosswiftnsoperationqueue

Setting backpressure in OperationQueue (or alternative API, e.g. PromiseKit, Combine Framework)


I have 2 steps in processing pipeline which runs over many images:

  • Step 1: Load locally (or download) image (IO bound)
  • Step 2: Run machine learning model (CPU/ GPU/ Compute bound/ single threaded because the model is big). How do I limit the number of images stored in memory (from step 1) queuing for the 2nd step. This is called backpressure in Reactive programming.

Without backpressure, all the work from Step 1 might pile up, leading to a high memory usage just for having images open.

I guess I could use a semaphore (e.g. of 5) which represents roughly the amount of memory I am willing to give for step 1 (5 pictures). I guess this would make 5 of my background threads to block, which is probably a bad thing? (that's a serious question: is it bad to block a background thread, since it consumes resources.)


Solution

  • If you're using Combine, flatMap can provide the back pressure. FlatMap creates a publisher for each value it receives, but exerts back pressure when it reaches the specified maximum number of publishers that haven't completed.

    Here's a simplified example. Assuming you have the following functions:

    func loadImage(url: URL) -> AnyPublisher<UIImage, Error> {
       // ...
    }
    
    func doImageProcessing(image: UIImage) -> AnyPublisher<Void, Error> {
       // ...
    }
    
    let urls: [URL] = [...] // many image URLs
    
    let processing = urls.publisher
        .flatMap(maxPublishers: .max(5)) { url in 
            loadImage(url: url)
               .flatMap { uiImage in
                  doImageProcessing(image: uiImage)
               }
        }
    

    In the example above, it will load 5 images, and start processing them. The 6th image will start loading when one of the earlier ones is done processing.