I m now working on about the rxjava2
in android kotlin . And try to write a function to zip multiple observable and observer. But it seems like some mistake. Can anyone help?
First , i try to write zip 2 observable and it works. but when I want to extend it to vararg , it fails.
fun <T> ApiSubscribeZip2(observable1: Observable<T>, observable2: Observable<T>, observer: Observer<List<T>>) {
Observable.zip(observable1, observable2, BiFunction<T, T, List<T>> { t1: T, t2: T ->
zipAdd(t1, t2)
})
?.subscribeOn(Schedulers.io())?.unsubscribeOn(Schedulers.io())?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(observer as Observer<in List<T>>)
}
fun <T> ApiSubscribeZipN(vararg observable: Observable<T>?, observer: Observer<List<T>>) {
Observable.zip(observable, Function<T, List<T>> { it ->
zipAdd(it)
})
}
private fun <T> zipAdd(vararg observableType: T): List<T> {
val list = ArrayList<T>()
for (ob in observableType) {
list.add(ob)
}
return list
}
the apisubscribezipN shows that None of the following function can be called with the arguments supplied.
You can use Observable.zipIterable
like this:
fun <T> ApiSubscribeZipN(vararg observable: Observable<T>?, observer: Observer<List<T>>) {
Observable.zipIterable<T, List<T>>(observable.filterNotNull().toList(), { it.toList() as List<T>? }, false, 100)
.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.io())
.subscribe(observer)
}