1) I have method Observable> moviesWithoutGenres() which returns List of movies but movie's genres field is null
2) I have method Observable> movieGenres(Movie m) that returns list of genres for specified field
I need to implement method that should returns Observable> and each movie in the list will have list of genres.
I have implemented such stream but my solution is using transformation to Observable using Observable.from() and than to Observable> using toList() operator. This solution is not acceptable as I am using SQLBrite wrapper for reactive sql queries under the hood of first method. The onCompleted is not called as streams always remain open so that changes to the sql tables can be propagated to all subscribers. So, toList operator can't be performed.
public class Movie {
private String id;
private String title;
private List<String> genres;
...
}
1) Observable<List<Movie>> moviesWithoutGenres();
2) Observable<List<String>> movieGenres(Movie m);
Should be implemented:
Observable<List<Movie>> movies();
public Observable<List<Movie>> movies() {
return moviesWithoutGenres()
.switchMap(movies -> Observable.from(movies) // This is an Observable<Movie>
.concatMap(movie -> movieGenres(movie)
.first()
.map(genres -> new Movie(movie.id, movie.title, genres)))
.toList());
}
Observable.from() will emit the items in the list followed by an onComplete(). The trick is since movieGenres is an endless observable you have to .first() it so that onComplete() is called and toList() will work properly