Search code examples
iosswiftalamofirerx-swiftcombine

Creating a Combine's publisher like RxSwift's Observable.Create for an Alamofire request


I use the following piece of code to generate a cold RxSwift Observable:

func doRequest<T :Mappable>(request:URLRequestConvertible) -> Observable<T> {
        let observable = Observable<T>.create { [weak self] observer in
        guard let self = self else { return Disposables.create() }
        self.session.request(request).validate().responseObject { (response: AFDataResponse<T>) in
            switch response.result {
                case .success(let obj):
                    observer.onNext(obj)
                    observer.onCompleted()
                case .failure(let error):
                    let theError = error as Error
                    observer.onError(theError)
            }
        }
         return Disposables.create()
    }
    return observable
}

where Mappable is an ObjectMapper based type, and self.session is an Alamofire's Session object.

I can't find an equivalent to Observable.create {...} in Apple's Combine framework. What I only found is URLSession.shared.dataTaskPublisher(for:) which creates a publisher using Apple's URLSession class.

How can I convert the above observable to an Alamofire Combine's publisher ?

EDIT: using the solution provided by rob, I ended up with the following:

 private let apiQueue = DispatchQueue(label: "API", qos: .default, attributes: .concurrent)

  func doRequest<T>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> where T : Mappable {

       Deferred { [weak self] () -> Future<T, AFError> in

          guard let self = self else {
              return Future<T, AFError> { promise in  
promise(.failure(.explicitlyCancelled))  }
        }

          return Future { promise in
            self.session
            .request(request)
            .validate()
            .responseObject { (response: AFDataResponse<T>) in
                promise(response.result)
            }
        }
    }
    .handleEvents(receiveCompletion: { completion in
        if case .failure (let error) = completion {
                //handle the error
        }
    })
    .receive(on: self.apiQueue)
    .eraseToAnyPublisher()
}

EDIT2: I have to remove the private queue since it's not needed, Alamofire does the parsing the decoding on its own, so remove the queue and its usages (.receive(on: self.apiQueue))


Solution

  • You can use Future to connect responseObject's callback to a Combine Publisher. I don't have Alamofire handy for testing, but I think the following should work:

    func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
        return Future { promise in
            self.session
                .request(request)
                .validate()
                .responseObject { (response: AFDataResponse<T>) in
                promise(response.result)
            }
        }.eraseToAnyPublisher()
    }
    

    Note that this is somewhat simpler than the RxSwift version because promise takes a Result directly, so we don't have to switch over response.result.

    A Future is sort of a “lukewarm” publisher. It is like a hot observable because it executes its body immediately and only once, so it starts the Alamofire request immediately. It is also like a cold observable, because every subscriber eventually receives a value or an error (assuming you eventually call promise). The Future only executes its body once, but it caches the Result you pass to promise.

    You can create a truly cold publisher by wrapping the Future in a Deferred:

    func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
        return Deferred {
            Future { promise in
                self.session
                    .request(request)
                    .validate()
                    .responseObject { (response: AFDataResponse<T>) in
                        promise(response.result) }
            }
        }.eraseToAnyPublisher()
    }
    

    Deferred calls its body to create a new inner Publisher every time you subscribe to it. So each time you subscribe, you'll create a new Future that will immediately start a new Alamofire request. This is useful if you want to use the retry operator, as in this question.