Search code examples
androidkotlinziprx-java2

android rxjava2 zip vararg parameter


I m now working on about the rxjava2 in android kotlin . And try to write a function to zip multiple observable and observer. But it seems like some mistake. Can anyone help? First , i try to write zip 2 observable and it works. but when I want to extend it to vararg , it fails.

fun <T> ApiSubscribeZip2(observable1: Observable<T>, observable2: Observable<T>, observer: Observer<List<T>>) {
    Observable.zip(observable1, observable2, BiFunction<T, T, List<T>> { t1: T, t2: T ->
        zipAdd(t1, t2)
    })
        ?.subscribeOn(Schedulers.io())?.unsubscribeOn(Schedulers.io())?.observeOn(AndroidSchedulers.mainThread())
        ?.subscribe(observer as Observer<in List<T>>)
}

fun <T> ApiSubscribeZipN(vararg observable: Observable<T>?, observer: Observer<List<T>>) {
    Observable.zip(observable, Function<T, List<T>> { it ->
        zipAdd(it)

    })
}

private fun <T> zipAdd(vararg observableType: T): List<T> {
    val list = ArrayList<T>()
    for (ob in observableType) {
        list.add(ob)
    }
    return list
}

the apisubscribezipN shows that None of the following function can be called with the arguments supplied.


Solution

  • You can use Observable.zipIterable like this:

    fun <T> ApiSubscribeZipN(vararg observable: Observable<T>?, observer: Observer<List<T>>) {
        Observable.zipIterable<T, List<T>>(observable.filterNotNull().toList(), { it.toList() as List<T>? }, false, 100)
            .subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.io())
            .subscribe(observer)
    }