Search code examples
rx-swiftreactivex

RxSwift multiple observable in map


I ran into a situation where I would fetch an API which will generate json data of registered users. I would then have to loop through each user and fetch their avatar from remote url and save it to disk. I can perform this second task inside subscribe but this is not a best practice. I am trying to implement it with map, flatMap etc.

Here is my sample code:

self.dataManager.getUsers()
            .observeOn(MainScheduler.instance)
            .subscribeOn(globalScheduler)
            .map{ [unowned self] (data) -> Users in
                var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars

                if let cats = users.categories {
                    for cat in cats  {
                        if let profiles = cat.profiles {
                            for profile in profiles {
                                if let thumbnail = profile.thumbnail,
                                    let url = URL(string: thumbnail) {
                                    URLSession.shared.rx.response(request: URLRequest(url: url))
                                        .subscribeOn(MainScheduler.instance)
                                        .subscribe(onNext: { response in
                                            // Update Image
                                            if let img = UIImage(data: response.data) {
                                                try? Disk.save(img, to: .caches, as: url.lastPathComponent)
                                            }
                                        }, onError: { (error) in

                                        }).disposed(by: self.disposeBag)
                                }
                            }
                        }
                    }
                }

                return users
            }
            .subscribe(onSuccess: { [weak self] (users) in

            }).disposed(by: disposeBag)

There are 2 problems in this code. First is with the rx on URLSession which execute the task in background on another thread and there is no way to acknowledge the main subscribe back when this operation will finish. Second is with the loop and rx which is not efficient as it should generate multiple observables and then process it.

Any idea to improve this logic is welcome.


Solution

  • This was a fun puzzle.

    The "special sauce" that solves the problem is in this line:

    .flatMap { 
        Observable.combineLatest($0.map { 
            Observable.combineLatest(
                Observable.just($0.0), 
                URLSession.shared.rx.data(request: $0.1)
                    .materialize()
            ) 
        }) 
    }
    

    The map before the line creates an Observable<[(URL, URLRequest)]> and the line in question converts it to an Observable<[(URL, Event<Data>)]>.

    The line does this by:

    1. Set up the network call to create an Observable<Data>
    2. Materialize it to create an Observable<Event<Data>> (this is done so an error in one download won't shutdown the entire stream.)
    3. Lift the URL back into an Observable which gives us an Observable<URL>
    4. Combine the observables from steps 2 & 3 to produce an Observable<(URL, Event<Data>)>.
    5. Map each array element to produce [Observable<(URL, Event<Data>)>]
    6. Combine the observables in that array to finally produce Observable<[(URL, Event<Data>)]>

    Here is the code

    // manipulatedUsers is for the code you commented out.
    // users: Observable<Users>
    let users = self.dataManager.getUsers()
        .map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
        .asObservable()
        .share(replay: 1)
    
    // this chain is for handling the users object. You left it blank in your code so I did too.
    users
        .observeOn(MainScheduler.instance)
        .subscribe(onNext: { users in
    
        })
        .disposed(by: disposeBag)
    
    // This navigates through the users structure and downloads the images.
    // images: Observable<(URL, Event<Data>)>
    let images = users.map { $0.categories ?? [] }
        .map { $0.flatMap { $0.profiles ?? [] } }
        .map { $0.compactMap { $0.thumbnail } }
        .map { $0.compactMap { URL(string: $0) } }
        .map { $0.map { ($0, URLRequest(url: $0)) } }
        .flatMap { 
            Observable.combineLatest($0.map { 
                Observable.combineLatest(
                    Observable.just($0.0), 
                    URLSession.shared.rx.data(request: $0.1)
                        .materialize()
                ) 
            }) 
        }
        .flatMap { Observable.from($0) }
        .share(replay: 1)
    
    // this chain filters out the errors and saves the successful downloads.
    images
        .filter { $0.1.element != nil }
        .map { ($0.0, $0.1.element!) }
        .map { ($0.0, UIImage(data: $0.1)!) }
        .observeOn(MainScheduler.instance)
        .bind(onNext: { url, image in
            try? Disk.save(image, to: .caches, as: url.lastPathComponent)
            return // need two lines here because this needs to return Void, not Void?
        })
        .disposed(by: disposeBag)
    
    // this chain handles the download errors if you want to.
    images
        .filter { $0.1.error != nil }
        .bind(onNext: { url, error in
            print("failed to download \(url) because of \(error)")
        })
        .disposed(by: disposeBag)