Search code examples
javabufferrx-java

Rx Zip buffered observables of uneven length


The problem I'm trying to solve is expressed in the following code:

@Test
public void buffer_shouldZipAllTheThings() throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(1);

    Observable<List<String>> strings = Observable.from(Arrays.asList("red", "blue", "yellow", "green")).buffer(3);
    Observable<List<Integer>> integers = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).buffer(3);

    class Zipped {
        final List<String> strings;
        final List<Integer> integers;

        Zipped(List<String> strings, List<Integer> integers) {
            this.strings = strings;
            this.integers = integers;
        }

        @Override public String toString() {
            return "Strings: { " + strings.toString() + " }\n" + "Integers: { " + integers.toString() + " }";
        }
    }

    Observable<Zipped> zipper = Observable.zip(strings, integers, new Func2<List<String>, List<Integer>, Zipped>() {
        @Override public Zipped call(List<String> strings, List<Integer> integers) {
            return new Zipped(strings, integers);
        }
    });

    zipper.subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Zipped>() {
        @Override public void onCompleted() {
            latch.countDown();
        }

        @Override public void onError(Throwable throwable) {

        }

        @Override public void onNext(Zipped zipped) {
            System.out.println("-- New zipped object: ");
            System.out.println(zipped.toString());
        }
    });

    latch.await();
}

Here is the output of this test:

-- New zipped object: 
Strings: { [red, blue, yellow] }
Integers: { [1, 2, 3] }
-- New zipped object: 
Strings: { [green] }
Integers: { [4, 5, 6] }

Here is my desired output:

-- New zipped object: 
Strings: { [red, blue, yellow] }
Integers: { [1, 2, 3] }
-- New zipped object: 
Strings: { [green] }
Integers: { [4, 5, 6] }
-- New zipped object: 
Strings: { [] }
Integers: { [7, 8, 9] }
-- New zipped object: 
Strings: { [] }
Integers: { [10, 11] }

Solution

  • How about creating an infinite Observable and use takeWhile to stop the streaming?

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
    
        Observable<List<String>> strings = Observable.from(Arrays.asList("red", "blue", "yellow", "green")).buffer(3);
        Observable<List<Integer>> integers = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).buffer(3);
    
        Observable<List<String>> infiniteStrings = Observable.concat(strings, Observable.<List<String>>from(new ArrayList<String>()).repeat(Schedulers.newThread()));
        Observable<List<Integer>> infiniteIntegers = Observable.concat(integers, Observable.<List<Integer>>from(new ArrayList<Integer>()).repeat(Schedulers.newThread()));
    
        class Zipped {
            final List<String> strings;
            final List<Integer> integers;
    
            Zipped(List<String> strings, List<Integer> integers) {
                this.strings = strings;
                this.integers = integers;
            }
    
            @Override
            public String toString() {
                return "Strings: { " + strings.toString() + " }\n" + "Integers: { " + integers.toString() + " }";
            }
        }
    
        Observable<Zipped> zipper = Observable.zip(infiniteStrings, infiniteIntegers, new Func2<List<String>, List<Integer>, Zipped>() {
            @Override
            public Zipped call(List<String> strings, List<Integer> integers) {
                System.out.println(strings + " " + integers);
                return new Zipped(strings, integers);
            }
        }).takeWhile(new Func1<Zipped, Boolean>() {
    
            @Override
            public Boolean call(Zipped zipped) {
                System.out.println(!zipped.strings.isEmpty() || !zipped.integers.isEmpty());
                return !zipped.strings.isEmpty() || !zipped.integers.isEmpty();
            }
        });
    
        zipper.subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Zipped>() {
            @Override
            public void onCompleted() {
                System.out.println("Finish");
                latch.countDown();
            }
    
            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                latch.countDown();
            }
    
            @Override
            public void onNext(Zipped zipped) {
                System.out.println("-- New zipped object: ");
                System.out.println(zipped.toString());
            }
        });
    
        latch.await();
    }