Search code examples
javafrprx-java

Collecting Observables to a List doesn't seem to emit the collection at once


I'm using RxJava to essentially collect the list of individually emitted Observables and combine them into a list of Observables (essentially sort of the opposite of flatMap). Here's my code:

        // myEvent.findMemberships() returns an Observable<List<Membership>>

        myEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .toList()
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<List<User>>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(List<User> users) {
                 Timber.d("%d users emitted", users.size());
               }
             })

I notice that my onNext is never called. I can't seem to understand this. If i remove the .toList call and basically output the individual Users (as shown below) it works by emitting each item.

subscriptions //
    .add(currentEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<User>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(User user) {
                 Timber.d("%d users emitted", user.getName());
               }
             }));

Q1. Is my understanding of .toList incorrect?

Q2. How does one combing a stream of individually emitted Observable<Object>s into a single Observable<List<Object>> ?

** EDIT

@kjones totally nailed the issue. I was not calling onComplete with my findMemberships call. I've added the code snippet below. My real use case was a little more convoluted with a bunch of more transformations which is why I needed to be using .toList call. As @zsxwing also rightly pointed out, for just this use case, a simple map will suffice.

public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
  @Override
  public void call(Subscriber<? super List<Membership>> subscriber) {
    try {
      // .....
      List<Membership> memberships = queryMyDb();

      subscriber.onNext(memberships);

      // BELOW STATEMENT FIXES THE PROBLEM ><
      // subscriber.onCompleted();

    } catch (SQLException e) {
      // ...
    }
  }
});

}


Solution

  • It looks like the Observable returned by the myEvent.findMemberships() call is never calling onComplete. Can you show this code?

    If that is the case, it would explain the behavior you are seeing. The .toList() will not emit the list until all items have been emitted (signaled by onComplete).

    Your second version without .toList(), would proceed as follows:

    .findMemberships()
        emits a single List<Membership>
    .flatMap()
        transforms List<Membership> into a single List<User>
        Observable.from(users) creates an observable that emits each user
    .subscribe()
        onNext() is called for each user
        onCompleted() is never called.
    

    Your original version:

    .findMemberships()
        emits a single List<Membership>
    .flatMap()
        transforms List<Membership> into a single List<User>
        Observable.from(users) creates an observable that emits each user
    .toList()
        buffers each User waiting for onCompleted() to be called
        onCompleted is never called because the .findMemberships Observable never completes
    

    There are several solutions:

    1) Make the findMemberShips() Observable call onComplete.This may not be desirable if the Observable returned by findMemberShips() is a Rx Subject (PublishSubject, BehaviorSubject, etc)

    2) Use Observable.just() instead of Observable.from(). You already have a List<User> in .flatMap(), just return it. Using Observable.from(users) creates an Observable that emits each user. Observable.just(users) would create an Observable that emits a single List<User>. No need for .toList().

    3) Use .map() instead of .flatMap(). Again, no need for .toList(). Since each List<Membership> gets transformed into a List<User> you only need to use .map().

    myEvent
        .findMemberships()
        .map(new Func1<List<Membership>, List<User>>() {
            @Override
            public List<User> call(List<Membership> memberships) {
                List<User> users = new ArrayList<User>();
                for (Membership membership : memberships) {
                    users.add(membership.getUser());
                }
                return users;
             }
        })