Search code examples
rx-swift

RxSwift: Calling onCompleted after onNext delivers only the completed event


I'm wrapping some legacy completion-block code in an Observable. It will emit one event (next or error), and then complete. The problem is that calling onNext(), onCompleted() only sends the completed event to the observer. Why doesn't the next event get delivered?

UPDATE: The people stream actually works as expected. The issue turns out to be in the next stream, filteredPeople. The inner completed event is passed along to it, and I'm just returning it, which terminates the stream.

I need to filter out completed events from inner streams.

let people = Observable<Event<[Person]>>()
    .flatMapLatest {
        return fetchPeople().asObservable().materialize()
    }
    .share()

// this is bound to a search field
let filterText = PublishSubject<String>()

let filteredPeople = Observable.combineLatest(people, filterText) { peopleEvent, filter in

    // this is the problem. the completed event from people is being returned, and it terminates the stream
    guard let people = peopleEvent.element else { return peopleEvent }

    if filterText.isEmpty { return .next(people) }

    return .next(people.filter { ... })
}

func fetchPeople() -> Single<[Person]> {
    return Single<[Person]>.create { observer in
        PeopleService.fetch { result in
            switch result {
            case .success(let people):
                observer(.success(people))
            case .failure(let error):
                observer(.error(error))
            }
        }

        return Disposables.create()
    }
}

filteredPeople.subscribe(
    onNext: { event in
        // ?! doesn't get called
    },
    onCompleted: {
        //  we get here, but why?
    },
    onError: {event in
        ...
    }).disposed(by: disposeBag)

Solution

  • I fixed it by filtering out completed events from the inner stream. I am not sure this is the right way, but I can't think of a better solution.

    let people = Observable<Event<[Person]>>()
        .flatMapLatest {
            return fetchPeople()
                .asObservable()
                .materialize()
                // Our work is done, but don't end the parent stream
                .filter { !$0.isCompleted }
        }
        .share()