Search code examples
swiftcombine

Swift Combine alternative to Rx Observable.create


I have some code that is built using RxSwift, and I'm playing around with converting it to use Apple's Combine framework.

One pattern which is very common is the use of Observable.create for one-shot observables (usually network requests). Something like this:

func loadWidgets() -> Observable<[Widget]> {
  return Observable.create { observer in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      observer.onNext(widgets)
      observer.onComplete()
    }, error: { error in
      // publish error on failure
      observer.onError()
    })
    // allow cancellation
    return Disposable {
      loadTask.cancel()
    }
  }
}

I'm trying to map that across to Combine and I haven't been able to quite figure it out. The closest I've been able to get is using Future for something like this:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
  return Future<[Widget], Error> { resolve in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      resolve(.success(widgets))
    }, error: { error in
      // publish error on failure
      resolve(.failure(error))
    })
    // allow cancellation ???
  }
}

As you can see, it does most of it, but there's no ability to cancel. Secondarily, future doesn't allow multiple results.

Is there any way to do something like the Rx Observable.create pattern which allows cancellation and optionally multiple results?


Solution

  • I think I found a way to mimic Observable.create using a PassthroughSubject in Combine. Here is the helper I made:

    struct AnyObserver<Output, Failure: Error> {
        let onNext: ((Output) -> Void)
        let onError: ((Failure) -> Void)
        let onComplete: (() -> Void)
    }
    
    struct Disposable {
        let dispose: () -> Void
    }
    
    extension AnyPublisher {
        static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
            let subject = PassthroughSubject<Output, Failure>()
            var disposable: Disposable?
            return subject
                .handleEvents(receiveSubscription: { subscription in
                    disposable = subscribe(AnyObserver(
                        onNext: { output in subject.send(output) },
                        onError: { failure in subject.send(completion: .failure(failure)) },
                        onComplete: { subject.send(completion: .finished) }
                    ))
                }, receiveCancel: { disposable?.dispose() })
                .eraseToAnyPublisher()
        }
    }
    

    And here is how it looks in usage:

    func loadWidgets() -> AnyPublisher<[Widget], Error> {
        AnyPublisher.create { observer in
            let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
              observer.onNext(widgets)
              observer.onComplete()
            }, error: { error in
              observer.onError(error)
            })
            return Disposable {
              loadTask.cancel()
            }
        }
    }