Search code examples
javascalareactive-programmingrx-javareactivex

ReactiveX: calculate frequency of distinct elements in an Observable


I have an Observable<String>. I would like to turn this into a Map<String, Int> which tells me the number of occurrences for each distinct string.

The observable contains ~1 billion elements, out of which 1000 are distinct (therefore storing the entire dataset in the RAM is not an option). Currently I iterate over the Observable and update a HashMap. I also make sure to observe on the same thread to avoid race conditions. However, getting the element frequency should be inherently easy to parallelise, hence it would be nice to make use of that.

Is there a way to do that?


Solution

  • You can use groupBy instead of maintaining the HashMap by yourself. groupBy will create an Observable for each key, and you can subscribe it on a different Scheduler. E.g.,

    public class KeyCounter {
        int key;
        long count;
    
        public KeyCounter(int key, long count) {
            this.key = key;
            this.count = count;
        }
    
        @Override
        public String toString() {
            return "key: " + key + " count: "  + count;
        }
    }
    
    @Test
    public void foo() {
        Observable<Integer> o = Observable.just(1, 2, 3, 2, 1);
        o.groupBy(i -> i).flatMap(
            group ->
                group.subscribeOn(Schedulers.computation()).countLong().map(count -> new KeyCounter(group.getKey(), count))
        ).subscribe(System.out::println);
    
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }