Search code examples
javarx-java2

Using RxJava for Grouped Moving Average


I can calculate the moving average of a simple integer list easily like:

Integer arr[] = {1, 2, 3, 4, 5, 6};                        
Observable<Integer> oi = Observable.from(arr);                          
oi.buffer(24, 1).subscribe(x -> average(x))

Now lets say that I have objects instead of integers like

private class Model{
  public String key;
  public Double value;
}

where I want to group and calculate the moving average based on key in a non-blocking fashion (i.e. I am getting a continuous stream from rabbitmq) in way that will emit {key->average} values.

I am aware of the groupBy operator but things get messy when I use it. What is the best way to do this with groupBy ?


Solution

  • You can do this:

    Observable<Model> oi = ...;
    oi.groupBy(model -> model.key)
        .flatMapSingle(Observable::toList)
        .subscribe(modelsGrouped -> { // key: modelsGrouped.get(0).key
            double avg = average(modelsGrouped);
            //...
        });
    

    Hope it helps!