Search code examples
swiftrx-swift

how to turn multiple observables into a completable?


I'm new to RxSwift and just inherited an old codebase, so please forgive me if this is a silly question.

In the code, data gets updated with the help of Completables and Observables. Below are two methods that loosely illustrate how that's done (obfuscated a bit for privacy purposes):

// note: `getNewData()` returns an Observable
func refreshData() -> Completable {
    dataManager.getNewData()
        .map { DataRepresentation(fromObject: $0) }
        .take(1)
        .asSingle()
        .flatMapCompletable { data in
            self.storageManager.save(data: data)
        }
}

// STORAGE MANAGER

func save(data: DataRepresentation) -> Completable {
    do {
        // PSEUDOCODE: save the data, emit an event about it if necessary.
        return Completable.completed()
    } catch let error {
        return Completable.error(error)
    }
}

So, my question is this: let's assume getNewData() allows me to pass in some parameters which will make it so that I don't just get the same data back every time. Moreover, let's say I want to call that method n times, wait for all the calls to come back, then still return a Completable from refreshData() (as to not need to change its signature). Is that sort of thing possible? I was looking into .zip but I'm not sure if it applies here. Thanks.


Solution

  • Here is an updated solution based on all the comments:

    final class Example {
        let dataManager: DataManager
        let storageManager: StorageManager
    
        init(dataManager: DataManager, storageManager: StorageManager) {
            self.dataManager = dataManager
            self.storageManager = storageManager
        }
    
        func refreshData() -> Completable {
            let myInputs = [1,2,3]
            return Observable.zip(myInputs.map(dataManager.getNewData))
                .map { $0.map(DataRepresentation.init(fromObject:)) }
                .asSingle()
                .flatMapCompletable { [storageManager] representations in
                    storageManager.save(representations)
                }
        }
    }
    
    struct DataManager {
        let getNewData: (Int) -> Observable<Data>
    }
    
    struct StorageManager {
        let save: ([DataRepresentation]) -> Completable
    }
    
    struct DataRepresentation: Equatable {
        let fromObject: Data
    }
    

    Here is a test harness showing that it works (with extensive comments):

    final class ExampleTests: XCTestCase {
        let scheduler = TestScheduler(initialClock: 0)
        // monitors the values sent to DataManager's `getNewData`
        lazy var dataManagerArgs = scheduler.createObserver(Int.self)
        // monitors the values sent to StorageManger's `save`.
        lazy var storageManagerArgs = scheduler.createObserver([DataRepresentation].self)
        // expect to receive an array of three `DataRepresentation` objects one second after the completable is
        //   subscribed to
        let expectedStorageManagerArgs = parseTimeline(
            "-*",
            values: [
                "*": [
                    DataRepresentation(fromObject: "A".data(using: .utf8)!),
                    DataRepresentation(fromObject: "B".data(using: .utf8)!),
                    DataRepresentation(fromObject: "C".data(using: .utf8)!)
                ]
            ])
            .offsetTime(by: 200)
    
        func test_happy_path() {
            // this DataManager sends either "A", "B", or "C" and then a completed event for each subscription.
            let dataManager = createDataManager(timeline: { "-\($0)|" })
    
            // this StorageManager sends a completed event after it's subscribed to.
            let storageManager = createStorageManager(timeline: { _ in "-|" })
    
            let sut = Example(dataManager: dataManager, storageManager: storageManager)
            let result = scheduler.start {
                sut.refreshData() // call `refreshData()` and record the result of the Completable event.
            }
    
            // expect to receive all three values as `refreshData` is called.
            let expectedDataManagerArgs = parseTimelineEvents("*", values: { _ in [1, 2, 3] }).offsetTime(by: 100)
            XCTAssertEqual(dataManagerArgs.events, expectedDataManagerArgs[0])
    
            XCTAssertEqual(storageManagerArgs.events, expectedStorageManagerArgs[0])
    
            // expect for the `refreshData` completable to complete after the `save` completes.
            let expectedResult = parseTimeline("--|", values: { _ in fatalError() }).offsetTime(by: 200)
            XCTAssertEqual(result.events, expectedResult[0])
        }
    
        func test_getNewData_failure() {
            // this DataManager sends either "A", or "C", and then a completed event for subscriptions 1 and 3.
            //   it sends an error for subscription 2
            let dataManager = createDataManager(timeline: { $0 == 2 ? "-#" : "-\($0)|" })
    
            // this StorageManager sends a completed event after it's subscribed to.
            let storageManager = createStorageManager(timeline: { _ in "-|" })
    
            let sut = Example(dataManager: dataManager, storageManager: storageManager)
            let result = scheduler.start {
                sut.refreshData() // call `refreshData()` and record the result of the Completable event.
            }
    
            // expect to receive all three values as `refreshData` is called.
            let expectedDataManagerArgs = parseTimelineEvents("*", values: { _ in [1, 2, 3] }).offsetTime(by: 100)
            XCTAssertEqual(dataManagerArgs.events, expectedDataManagerArgs[0])
    
            // a single error from `getNewData` means no save events.
            XCTAssertEqual(storageManagerArgs.events, [])
    
            // expect for the `refreshData` completable to emit an error when the getNewData fails.
            let expectedResult = parseTimeline("-#", values: { _ in fatalError() }).offsetTime(by: 200)
            XCTAssertEqual(result.events, expectedResult[0])
        }
    
        func test_save_error() {
            // this DataManager sends either "A", "B", or "C" and then a completed event for each subscription.
            let dataManager = createDataManager(timeline: { "-\($0)|" })
    
            // this StorageManager sends an error one second after it's subscribed to.
            let storageManager = createStorageManager(timeline: { _ in "-#" })
    
            let sut = Example(dataManager: dataManager, storageManager: storageManager)
            let result = scheduler.start {
                sut.refreshData() // call `refreshData()` and record the result of the Completable event.
            }
    
            // expect to receive all three values as `refreshData` is called.
            let expectedDataManagerArgs = parseTimelineEvents("*", values: { _ in [1, 2, 3] }).offsetTime(by: 100)
            XCTAssertEqual(dataManagerArgs.events, expectedDataManagerArgs[0])
    
            XCTAssertEqual(storageManagerArgs.events, expectedStorageManagerArgs[0])
    
            // expect for the `refreshData` completable to emit an error when the save fails.
            let expectedResult = parseTimeline("--#", values: { _ in fatalError() }).offsetTime(by: 200)
            XCTAssertEqual(result.events, expectedResult[0])
        }
    
        func createDataManager(timeline: @escaping (Int) -> String) -> DataManager {
            DataManager(getNewData: scheduler.mock(
                args: dataManagerArgs,
                values: ["1": "A".data(using: .utf8)!, "2": "B".data(using: .utf8)!, "3": "C".data(using: .utf8)!],
                timelineSelector: timeline
            ))
        }
    
        func createStorageManager(timeline: @escaping ([DataRepresentation]) -> String) -> StorageManager {
            StorageManager(save: { [scheduler, storageManagerArgs] value in
                scheduler.mock(args: storageManagerArgs, values: ["N": ()], timelineSelector: timeline)(value)
                    .ignoreElements()
                    .asCompletable()
            })
        }
    }