Search code examples
iosreactive-programmingalamofirerx-swift

Managing multiple uploads with ReactiveX (on iOS with Swift and Alamofire)


I am attempting to upload multiple photos to a server using ReactiveX (RxSwift), gathering the responses from each request, and then making one final request to complete the submission.

Everything seems to be working fairly well until I attempt to reduce all of the responses. The final subscribeNext is never called. (Perhaps I misunderstand how flatMap or reduce works?)

Specifically, this is how I am attempting to perform this procedure.

  • Prepare an observable to encode each photo (self.imageMgr is an instance of PHCachingImageManager())

    func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> {
        return create { observer in
            self.imageMgr.requestImageForAsset(asset,
                targetSize: PHImageManagerMaximumSize,
                contentMode: .AspectFit,
                options: nil,
                resultHandler: { (myImage, myInfo) -> Void in
                    let data = UIImageJPEGRepresentation(myImage!, 1.0)!
                    NSLog("Encoded photo")
                    observer.onNext(data)
                    self.converts += 1
                    if self.converts == self.userReview.photos.count {
                        NSLog("Completed encoding photos")
                        observer.onCompleted()
                    }
                })
            return NopDisposable.instance
        }
    }
    
  • Prepare an observable to upload each photo once encoded (with Alamofire and RxAlamofire)

    func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> {
        return create { observer in
            NSLog("Uploading Photo")
    
            upload(.POST,
                urlRequest.URLString,
                headers: nil,
                multipartFormData: { mfd in
                    mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg")
                },
                encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold,
                encodingCompletion: { encodingResult in
                    switch encodingResult {
                    case .Success(let upload, _, _):
                        upload.responseJSON(completionHandler: { (myResponse) -> Void in
                            if let photoResponse = myResponse.result.value {
                                let photoObject = photoResponse.objectForKey("photo")!
                                let photo = ReviewPhotoObject()
                                photo.photoID = photoObject.objectForKey("id")! as! NSNumber
                                NSLog("Uploaded Photo")
                                observer.onNext(photo)
                            }
    
                            self.uploads += 1
                            if self.uploads == self.userReview.photos.count {
                                NSLog("Completed uploading photos")
                                observer.onCompleted()
                            }
                        })
    
                    case .Failure(let encodingError):
                        observer.onError(encodingError)
                        print(encodingError)
                    }
                })
    
            return NopDisposable.instance
        }
    }
    
  • Finally, put it all together

    func postReview(review: MyReview) {
        self.userReview = review
    
        _ = review.photos.toObservable().flatMap { photos in
            return self.getPhotoDataObservable(photos)
        }.flatMap { photoData in 
            return self.getPostPhotoObservable(photoData)
        }.reduce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in
            accumulator.append(Int(photo.photoID))
            return accumulator
        }).subscribeNext({ (photoIds) -> Void in
            print(photoIds) // Never called
        })
    }
    

When run (with 2 photos for example), this is the output:

Encoded photo
Uploading photo
Encoded photo
Uploading photo
Completed encoding photos
Uploaded photo
Uploaded photo
Completed uploading photos

But subscribeNext is never called. Since documentation on RxSwift specifically is still a little thin, I was hoping someone around here could clue me in on what I'm misunderstanding.


Solution

  • The idea here is that once an observable is done sending all the elements it's going to send, it should complete. You are creating an observable for each PHAsset and that observable only sends one element so it should complete after that. The way you had the code, only the last one would complete, so the reduce operator was just sitting around waiting for the rest to complete before it could finish its job.

    Here is how I would have written the first function (In Swift 3 instead of 2.)

    extension PHImageManager {
    
        func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> {
            return .create { observer in
                let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in
                    if let image = image {
                        observer.onNext(image)
                        observer.onCompleted()
                    }
                    else if let info = info, let error = info[PHImageErrorKey] as? Error {
                        observer.onError(error)
                    }
                })
                return Disposables.create { self.cancelImageRequest(request) }
            }
        }
    }
    

    You will see that I would have made it an extension on PHImageManager instead of a free function, but that's just a style difference. The functional differences are that my code will emit an error if the underlying request errors out, and will cancel the request if the subscribers all bail before the request completes. Also, it doesn't do the JPEG conversion. Keep these operations small and do the JPEG conversion inside a map like this:

        let imagesData = review.photos.toObservable().flatMap {
            self.imageMgr.requestMaximumSizeImage(for: $0)
        }.map {
            UIImageJPEGRepresentation($0, 1.0)
        }.filter { $0 != nil }.map { $0! }
    

    The above code requests the images from the manager, then converts them to JPEG data, filtering out any that failed the conversion. imagesData is an Observable<Data> type.

    Your getPostPhotoObservable is fine except for the completed issue, and the fact that it doesn't handle cancelation in the disposable. Also, you could just have your post function return an Observable instead of wrapping the result in a ReviewPhotoObject.

    Other warnings:

    1. The way you are putting it all together doesn't ensure that the ReviewPhotoObjects will be in the same order as the photos going in (because you can't guarantee the order that the uploads will complete in.) To fix this, if necessary, you would need to use concat instead of flatMap.

    2. If any of the uploads fail, the entire pipeline will disconnect and abort any subsequent uploads. You should probably set up something to catch the errors and do something appropriate. Either catchErrorJustReturn or catchError depending on your requirements.