I have a dummy network data source:
fun networkDataSource(): Single<List<Int>> {
return Single.just((0 until 100).toList())
.delay(150, TimeUnit.MILLISECONDS)
}
Here is an endless observable. It's main use is that it's calculation should be 'protected', so that it calculates it's single value only once. (Here the value is the 1.)
val endless = Observable
.just(1)
.observeOn(Schedulers.io())
.delay(500, TimeUnit.MILLISECONDS)
// Counts as heavy operation, do not calculate this here once again
.doOnNext { println("=> E: Calculated once") }
.cache()
//.doOnNext { println("=> E: From cache") }
.repeat()
The main stream just simply emits values:
val mainStream = Observable.range(0, 6)
.doOnNext { println("=> M: Main stream $it") }
The task:
Zip the 3 observables together, and optimize the network usage, so that it will not be called more than necessary. (Once the number of data - integers in this case - is met.
Approach:
mainStream
.concatMap {index ->
Observables.zip(
Observable.just(index),
endless,
networkDataSource()
.toObservable()
.doOnNext { println("#> N: Network data fetch $index") }
)
}
.doOnNext { println("=> After concatmap: ${it.first}") }
.take(4)
.doOnNext { println("=> After take: ${it.first}") }
.subscribe(
{ println("=> Last onnext") },
{ it.printStackTrace() },
{ synchronized(check) { check.notifyAll() } }
)
Finishing the locked thread - only needed for testing:
synchronized(check) {
check.wait()
}
println("Ending")
Here is the output:
=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext
This the output and it is stuck after the second take. (Doesn't proceed in a minute). My question is, why does that happen?
As a sidenote, if I uncomment the line from the endless
observable:
.doOnNext { println("=> E: From cache") }
It is going to flood the console with that line. Why is endless
called so many times instead for each iteration?
flatMap()
is not a solution here, because it does not take take(4)
into account and proceeds to finish all network calls.
So how could I make concatMap()
work?
(I also added RxJS tag because this is a reactive problem and absolutely not about Kotlin. JS solutions are also welcome, if those functions exist in the RxJava library.)
Edit.:
I looked into the code and the 2 outputs are probably because of the prefetch
parameter:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
But I still don't understand how it works. I've only read that concatMap()
is flatmap()
, but it waits for the results of each.
From the comments:
The whole setup will likely run on the same thread after the first item and the repeat
in endless
will never give up the thread, preventing any other operator from progressing. It makes no sense to me to repeat that cache
because you'll only ever use one and only item of it.