Search code examples
swiftcombine

Swift Combine: How to create a single publisher from a list of publishers?


Using Apple's new Combine framework I want to make multiple requests from each element in a list. Then I want a single result from a reduction of all the the responses. Basically I want to go from list of publishers to a single publisher that holds a list of responses.

I've tried making a list of publishers, but I don't know how to reduce that list into a single publisher. And I've tried making a publisher containing a list but I can't flat map a list of publishers.

Please look at the "createIngredients" function

func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit))
            .eraseToAnyPublisher()
}

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    // first attempt
    let results = ingredients
            .map(createIngredient)
    // results = [AnyPublisher<CreateIngredientMutation.Data, Error>]

    // second attempt
    return Publishers.Just(ingredients)
            .eraseToAnyPublisher()
            .flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in
                return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>]
            }
}

I'm not sure how to take an array of publishers and convert that to a publisher containing an array.

Result value of type '[AnyPublisher]' does not conform to closure result type 'Publisher'


Solution

  • Essentially, in your specific situation you're looking at something like this:

    func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
        Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
            .collect()
            .eraseToAnyPublisher()
    }
    

    This 'collects' all the elements produced by the upstream publishers and – once they have all completed – produces an array with all the results and finally completes itself.

    Bear in mind, if one of the upstream publishers fails – or produces more than one result – the number of elements may not match the number of subscribers, so you may need additional operators to mitigate this depending on your situation.

    The more generic answer, with a way you can test it using the EntwineTest framework:

    import XCTest
    import Combine
    import EntwineTest
    
    final class MyTests: XCTestCase {
        
        func testCreateArrayFromArrayOfPublishers() {
    
            typealias SimplePublisher = Just<Int>
    
            // we'll create our 'list of publishers' here. Each publisher emits a single
            // Int and then completes successfully – using the `Just` publisher.
            let publishers: [SimplePublisher] = [
                SimplePublisher(1),
                SimplePublisher(2),
                SimplePublisher(3),
            ]
    
            // we'll turn our array of publishers into a single merged publisher
            let publisherOfPublishers = Publishers.MergeMany(publishers)
    
            // Then we `collect` all the individual publisher elements results into
            // a single array
            let finalPublisher = publisherOfPublishers.collect()
    
            // Let's test what we expect to happen, will happen.
            // We'll create a scheduler to run our test on
            let testScheduler = TestScheduler()
    
            // Then we'll start a test. Our test will subscribe to our publisher
            // at a virtual time of 200, and cancel the subscription at 900
            let testableSubscriber = testScheduler.start { finalPublisher }
    
            // we're expecting that, immediately upon subscription, our results will
            // arrive. This is because we're using `just` type publishers which
            // dispatch their contents as soon as they're subscribed to
            XCTAssertEqual(testableSubscriber.recordedOutput, [
                (200, .subscription),            // we're expecting to subscribe at 200
                (200, .input([1, 2, 3])),        // then receive an array of results immediately
                (200, .completion(.finished)),   // the `collect` operator finishes immediately after completion
            ])
        }
    }