Search code examples
javaandroidkotlinrx-javarx-kotlin

Is There A Limit to The Number of Observables in .zip Method?


There seems to be a limit to the number of Observables to use as parameters in the zip method for Kotlin. If this is accurate, what is the best alternative?

For instance, when I use 9 parameters it works as expected. When I add a 10th parameter I receive the error Cannot infer a type for this parameter. Please specify it explicitly

Observable.zip(
            //TODO: parameterize exchange symbols based on pair
            methodOne() as Observable<Any>),
            methodTwo() as Observable<Any>),
            methodThree() as Observable<Any>),
            methodFour() as Observable<Any>),
            methodFive() as Observable<Any>),
            methodSix() as Observable<Any>),
            methodSeven() as Observable<Any>),
            methodEight() as Observable<Any>),
            methodNine() as Observable<Any>),
            { oneResult, twoResult, threeResult, fourResult, fiveResult, sixResult, sevenResult, eightResult, nineResult ->
                    //logic here applying computation to results
            })
            .subscribe(
                    {},
                    {
                        println(String.format("Error: %s", it.message))
                    })
            .unsubscribe()
}

Solution

  • RxJava only supports up to 9 distinct sources with zip. Beyond that, you have to use the zip(Iterable<ObservableSource>, Func<Object[],R>) method and cast each element of the Object[] back to its respective type.

    Returns an Observable that emits the results of a specified combiner function applied to combinations of items emitted, in sequence, by an Iterable of other ObservableSources. zip applies this function in strict sequence, so the first item emitted by the new ObservableSource will be the result of the function applied to the first item emitted by each of the source ObservableSources; the second item emitted by the new ObservableSource will be the result of the function applied to the second item emitted by each of those ObservableSources; and so forth.

    The resulting ObservableSource<R> returned from zip will invoke onNext as many times as the number of onNext invocations of the source ObservableSource that emits the fewest items.

    The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while disposing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnComplete()). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will dispose B immediately. For example:

    zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
    

    action1 will be called but action2 won't. To work around this termination property, use doOnDispose(Action) as well or use using() to do cleanup in case of completion or a dispose() call. Note on method signature: since Java doesn't allow creating a generic array with new T[], the implementation of this operator has to create an Object[] instead. Unfortunately, a Function<Integer[], R> passed to the method would trigger a ClassCastException.