According to the documentation of groupBy
:
Note: A
GroupedObservable
will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore thoseGroupedObservable
s that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator liketake(int)(0)
to them.
There's a RxJava tutorial which says:
Internally, every Rx operator does 3 things
- It subscribes to the source and observes the values.
- It transforms the observed sequence according to the operator's purpose.
- It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.
Let's take a look at the following code block which extracts only even numbers from range(0, 10)
:
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
My questions are:
Does it mean filter
operator already implies a subscription to every group resulted from groupBy
or just the Observable<GroupedObservable>
one?
Will there be a memory leak in this case? If so,
How to properly discard those groups? Replace filter
with a custom one, which does a take(0)
followed by a return Observable.empty()
? You may ask why I don't just return take(0)
directly: it's because filter
doesn't neccessarily follow right after groupBy
, but can be anywhere in the chain and involve more complex conditions.
Your suspicions are correct in that to properly handle the grouped observable each of the inner observables (g
) must be subscribed to. As filter
is subscribing to the outer observable only it's a bad idea. Just do what you need in the flatMap
using ignoreElements
to filter out undesired groups.
Observable.range(0, 10)
.groupBy(i -> i % 2)
.flatMap(g -> {
if (g.getKey() % 2 == 0)
return g;
else
return g.ignoreElements();
})
.subscribe(System.out::println, Throwable::printStackTrace);