Search code examples
rx-javasystem.reactivereactive-programmingrx-android

Live count emitted elements in rx


Is there any way of counting the amount of elements that have already been processed in a stream in RxAndroid?

I am having something like that:

Observable.fromArray(new String[]{"these", "are", "my", "values", "."})
            .map(s -> doSomeCoolStuff(s))
            // ...
            .subscribe(amountOfProcessedItems -> Log.d("test", "" + amountOfProcessedItems));

And I am looking for something so that my output would look like 1 2 3 4 5, basically after each item count the amount of items that have already been emitted.


Solution

  • Just count the elements:

    AtomicInteger counter = new AtomicInteger();
    Observable.fromArray(new String[]{"these", "are", "my", "values", "."})
            .map(s -> doSomeCoolStuff(s))
            // ...
            .subscribe(dummy -> Log.d("test", "" + counter.incrementAndGet()));
    

    Edit: If you just want to convert elements to increasing integers, here's how:

    sourceOservable
    .zipWith(Observable.range(0, Integer.MAX_VALUE), (any, counter) -> counter)
    .whatever(...)