Search code examples
javarx-java2

How to combine a dynamic number of Observables in RxJava 2?


I'm stuck in a quite weird problem and ask for your help.

The overall setting is the following: I have a bunch of data providers that can occur or disappear at runtime. These providers provide - big surprise - data, modeled as io.reactivex.Observables. (To be precise: as BehaviorSubjects, remembering the latest data for new subscribers.)

Now, I need to combine the data of all current data providers, so that I get a new "main" Observable which gets updated whenever any data provider's observable changes or when new providers appear (or old ones disappear).

So far, that sounds like merging, but for the main Observable I need all provider's data combined, on any change, each provider's respective last state. This combining works fine for non-dynamic providers, which are known in advance, using Observable.combineLatest.

But the problems arise, when I embed that method into a dynamic context, listending for added or removed providers. Then an update of one of the provider's Observable triggers not only one update as expected, but several updates, some of them only containing partial data.

Have a look at the following (self-contained, using RxJava 2.1.9) example, which should clarify on my problem. Commenting is done as println(), so that the produced output is readable, too. The first part is static, without adding or removing providers, and works as expected. The second part is the weird thing...

I'd appreciate any further ideas or assistance to solve this issue - thanks!

import io.reactivex.Observable;
import io.reactivex.functions.Function;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class CombineLatestWithDynamicSources {

    static final Function<Object[], List<String>> STRING_COMBINER = objects -> Arrays.stream(objects)
                                                                                     .map(Object::toString)
                                                                                     .collect(Collectors.toList());

    public static void main(String[] args) {

        System.out.println("*** STATIC ***");
        staticCombineWorksAsExpected();

        System.out.println("\n*** DYNAMIC ***");
        dynamicCombineBehavesWeird();

    }

    static void staticCombineWorksAsExpected() {

        Subject<String> subjectA = BehaviorSubject.createDefault("A.1");
        Subject<String> subjectB = BehaviorSubject.createDefault("B.1");

        List<Subject<String>> subjects = Arrays.asList(subjectA, subjectB); // potentially more...

        Observable<List<String>> combined = Observable.combineLatest(subjects, STRING_COMBINER);

        System.out.println("initial values:");
        combined.subscribe(strings -> System.out.println(">> Combined: " + strings));

        System.out.println("updating A:");
        subjectA.onNext("A.2");

        System.out.println("updating B:");
        subjectB.onNext("B.2");

        System.out.println("\n... works as expected, but adding subjects isn't possible in this setting.");
    }

    static void dynamicCombineBehavesWeird() {

        List<Subject<String>> subjectsList = new ArrayList<>();
        Subject<List<Subject<String>>> subjectsObservable = BehaviorSubject.createDefault(subjectsList);

        System.out.println("subjects are initially empty:");
        subjectsObservable.subscribe(subjects -> System.out.println(">> Subjects: " + subjects));

        Observable<List<String>> combined = subjectsObservable.flatMap(
                subjects -> Observable.combineLatest(subjects, STRING_COMBINER));

        combined.subscribe(strings -> System.out.println(">> Combined: " + strings));

        System.out.println("add Subject A, providing default value 'A.1' - as expected:");
        Subject<String> subjectA = BehaviorSubject.createDefault("A.1");
        subjectsList.add(subjectA);
        subjectsObservable.onNext(subjectsList);

        System.out.println("updating A - also as expected:");
        subjectA.onNext("A.2");

        System.out.println("add Subject B, providing default value 'B.1' - as expected, now both subject's last values show up:");
        Subject<String> subjectB = BehaviorSubject.createDefault("B.1");
        subjectsList.add(subjectB);
        subjectsObservable.onNext(subjectsList);

        System.out.println("updating A again - I'd expect the second result only! Why is there '[A.3]' popping up before the expected result?!");
        subjectA.onNext("A.3");

        System.out.println("This doesn't happen on updating B...:");
        subjectB.onNext("B.2");

        System.out.println("digging deeper, add Subject C, providing default value 'C.1' - as expected again:");
        Subject<String> subjectC = BehaviorSubject.createDefault("C.1");
        subjectsList.add(subjectC);
        subjectsObservable.onNext(subjectsList);

        System.out.println("Now update A - three results pop up, only the last is expected!");
        subjectA.onNext("A.4");

        System.out.println("update B, which now emits also two results - last expected only:");
        subjectB.onNext("B.3");

        System.out.println("update C works as expected:");
        subjectC.onNext("C.2");

        System.out.println("\n... huh? Seems on updating the first source, the combined results gets computed for the first, " + "then for the first and second, then for first, second and third (and so on?) source...");

    }

}

which produces the following output:

*** STATIC ***
initial values:
>> Combined: [A.1, B.1]
updating A:
>> Combined: [A.2, B.1]
updating B:
>> Combined: [A.2, B.2]

works as expected, but adding subjects isn't possible in this setting.

*** DYNAMIC ***
subjects are initially empty:
>> Subjects: []
add Subject A, providing default value 'A.1' - as expected:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e]
>> Combined: [A.1]
updating A - also as expected:
>> Combined: [A.2]
add Subject B, providing default value 'B.1' - as expected, now both subject's last values show up:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e, io.reactivex.subjects.BehaviorSubject@90f6bfd]
>> Combined: [A.2, B.1]
updating A again - I'd expect the second result only! Why is there '[A.3]' popping up before the expected result?!
>> Combined: [A.3]
>> Combined: [A.3, B.1]
This doesn't happen on updating B...:
>> Combined: [A.3, B.2]
digging deeper, add Subject C, providing default value 'C.1' - as expected again:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e, io.reactivex.subjects.BehaviorSubject@90f6bfd, io.reactivex.subjects.BehaviorSubject@47f6473]
>> Combined: [A.3, B.2, C.1]
Now update A - three results pop up, only the last is expected!
>> Combined: [A.4]
>> Combined: [A.4, B.2]
>> Combined: [A.4, B.2, C.1]
update B, which now emits also two results - last expected only:
>> Combined: [A.4, B.3]
>> Combined: [A.4, B.3, C.1]
update C works as expected:
>> Combined: [A.4, B.3, C.2]

huh? Seems on updating the first source, the combined results gets computed for the first, then for the first and second, then for first, second and third (and so on?) source...


Solution

  • Some time since - now I know better... ;-)

    Simple Solution - so simple... :m

    subjectsObservable.switchMap(subjects -> Observable.combineLatest(subjects, STRING_COMBINER))
                      .subscribe(strings -> System.out.println(">> Combined: " + strings));