Search code examples
rx-java

How to group events by idle periods using Reactive Extensions


I have a problem for which Reactive Extensions seems particularly well suited. I have an event source that creates events in short bursts with relatively long idle periods in between. I would like to group those events in batches where (ideally) each burst of events would end up in one batch. Using RxJava, is there a good way to do this? Observable.buffer(Observable) or Observable.buffer(Func0) seem promising, but it might be possible to do using Observable.window() or Observable.groupByUntil().


Solution

  • Here is code that seems to work for a debounced buffer:

    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    import rx.Observable;
    import rx.Subscriber;
    import rx.schedulers.Schedulers;
    
    public class DebounceBuffer {
    
        public static void main(String args[]) {
            // see all bursts in a single sequence
            //        intermittentBursts().toBlocking().forEach(System.out::println);
    
            // debounce to the last value in each burst
            //        intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
    
            /* The following will emit a buffered list as it is debounced */
            // first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe
            Observable<Integer> burstStream = intermittentBursts().publish().refCount();
            // then we get the debounced version
            Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS);
            // then the buffered one that uses the debounced stream to demark window start/stop
            Observable<List<Integer>> buffered = burstStream.buffer(debounced);
            // then we subscribe to the buffered stream so it does what we want
            buffered.take(20).toBlocking().forEach(System.out::println);
        }
    
        public static Observable<Integer> intermittentBursts() {
            return Observable.create((Subscriber<? super Integer> s) -> {
                while (!s.isUnsubscribed()) {
                    // burst some number of items
                    for (int i = 0; i < Math.random() * 20; i++) {
                        s.onNext(i);
                    }
                    try {
                        // sleep for a random amount of time
                        Thread.sleep((long) (Math.random() * 1000));
                    } catch (Exception e) {
                        // do nothing
                    }
                }
            }).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block
        }
    
    }
    
    

    It emits the following:

    [0, 1, 2, 3, 4, 5, 6, 7]
    [0, 1, 2, 3, 4, 5]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1]
    [0, 1, 2, 3, 4, 5]
    [0, 1, 2]
    [0, 1, 2, 3, 4, 5, 6, 7]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1, 2, 3, 4, 5, 6]
    [0, 1, 2]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1, 2, 3, 4]
    [0, 1, 2, 3]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1, 2, 3, 4, 5, 6, 7]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1, 2, 3]
    [0]
    [0, 1, 2]
    [0]