I have three Observable<[Recipe]>
streams that I want to combine into one single Observable<[Recipe]>
.
let breakfast = Database.database().reference().rx.fetchFireBaseData(recipeType: .breakfast)
.map { $0.filter { UserDefaults.standard.favoriteRecipes.contains($0.fbKey) }}
let dessert = Database.database().reference().rx.fetchFireBaseData(recipeType: .dessert)
.map { $0.filter { UserDefaults.standard.favoriteRecipes.contains($0.fbKey) }}
let cookies = Database.database().reference().rx.fetchFireBaseData(recipeType: .cookies)
.map { $0.filter { UserDefaults.standard.favoriteRecipes.contains($0.fbKey) }}
If I use Observable.zip(breakfast, dessert, cookies).do(onNext: { [weak self] value in...
I get ([Recipe], [Recipe], [Recipe])
If I use Observable.merge(breakfast, dessert, cookies).do(onNext: { [weak self] value in...
I get the streams in turns, meaning that the last one completed overrides the previous two.
If I use Observable.concat(breakfast, dessert, cookies).do(onNext: { [weak self] value in...
I get sort of the same behavior as with merge
.
So how do I combine the three streams into one without having them overwriting one another?
To get a better idea what I'm trying to do:
return Observable.merge(breakfast, cookies, dessert).do(onNext: { [weak self] value in
self?.content.accept(value.compactMap { FavoritesCollectionViewCellViewModel(favorite: $0)})
})
I want to accept
all combined streams to the behaviorRelay
, mapped to cell
data. The content
relay is then bound
to a collectionView
. When watching the collectionView
, with concat
I can see first and second stream briefly, then they are replaced with the last stream.
You may not know, but combineLatest (and zip) has a final parameter for manipulating the inputs. So you could do something like:
let allRecipes = Observable.combineLatest(breakfast, dessert, cookies) { $0 + $1 + $2 }
Which is equivalent to:
Observable.combineLatest(breakfast, dessert, cookies)
.map { $0.0 + $0.1 + $0.2 }
Which is essentially the same as what you ended up with, except you used zip
instead of combineLatest
Using zip
assumes that all three Observables will emit exactly the same number of elements. By using combineLatest, you assume that each Observable emits at least one element, but they don't have to all emit the same number.
If you wanted to avoid even that assumption, then add a .startWith
to each source.
Observable.combineLatest(
breakfast.startWith([]),
dessert.startWith([]),
cookies.startWith([])
) { $0 + $1 + $2 }
With the above, as soon as any one source observable emits, you will get something in your output, and another, larger, array each time the others emit.
Note that in all of the above (including your answer) an error in any one of the source observables will cause the entire thing to error out. If you want to avoid that, you will have to catch the errors at each source Observable. Exactly what that code looks like depends on what you want to do with the errors.