Search code examples
rx-javareactive-programmingrx-java2

rxJava opposite for throttleFirst operator (not throttling but collecting)


I need following operator which when element comes, starts the timer (creates window for elements) and collects them into List or Observable/Flowable. When time specified for the timer ends and no element comes, the operator doesn't send empty event. When next element comes, new timer is created and it starts to collect elements.

Rx java have Buffer and Window operator but this operators have disadvantages:

It is possible to filter this empty elements but I would like to avoid polluting scheduler with empty events (based on the timer) with List/Observable-s/Flowable-s.

I spend some time and found very similar formally, but functionally fulfilling the opposite role throttleFirst(long windowDuration, TimeUnit unit) https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png

But unfortunately it throttles, but not collect items.


Solution

  • I don't think one could save on object creation so if you want to avoid empty lists, filter them out.

    As for starting a new periodic timer when a new item arrives late, I can't think of any combination of existing operators that could do it without possibly losing items.

    I created the following "contraption" that could do it without item loss:

    public static final class BufferWithTimeout<T> {
    
        Scheduler.Worker trampoline = Schedulers.trampoline().createWorker();
    
        final long timeout;
        
        final TimeUnit unit;
        
        final Scheduler.Worker worker;
        
        final SerialDisposable timer = new SerialDisposable();
        
        final PublishSubject<List<T>> output = PublishSubject.create();
        
        List<T> current;
        
        long bufferIndex;
        
        BufferWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
            this.worker = scheduler.createWorker();
            this.timeout = timeout;
            this.unit = unit;
        }
        
        void onValue(T value) {
            trampoline.schedule(() -> {
                if (timer.isDisposed()) {
                    return;
                }
                if (current == null) {
                    current = new ArrayList<>();
                    long bi = ++bufferIndex;
                    timer.set(worker.schedulePeriodically(() -> {
                        onTime(bi);
                    }, timeout, timeout, unit));
                }
                current.add(value);
            });
        }
    
        void onTime(long index) {
            trampoline.schedule(() -> {
                if (index == bufferIndex && current != null) {
                    if (current.isEmpty()) {
                        current = null;
                        bufferIndex++;
                        timer.set(null);
                    } else {
                        output.onNext(current);
                        current = new ArrayList<>();
                    }
                }
            });
        }
    
        void onTerminate(Throwable error) {
            timer.dispose();
            worker.dispose();
            trampoline.schedule(() -> {
                if (current != null && !current.isEmpty()) {
                    output.onNext(current);
                    current = null;
                }
                if (error != null) {
                    output.onError(error);
                } else {
                    output.onComplete();
                }
            });
        }
        
        void dispose() {
            timer.dispose();
            worker.dispose();
            trampoline.schedule(() -> {
                current = null;
            });
        }
    
        public static <T> ObservableTransformer<T, List<T>> create(
                long timeout, TimeUnit unit, Scheduler scheduler) {
            return o -> 
                Observable.defer(() -> {
                    BufferWithTimeout<T> state = new BufferWithTimeout<>(
                        timeout, unit, scheduler);
    
                    return  o
                            .doOnNext(v -> state.onValue(v))
                            .doOnError(e -> state.onTerminate(e))
                            .doOnComplete(() -> state.onTerminate(null))
                            .ignoreElements()
                            .<List<T>>toObservable()
                            .mergeWith(state.output.doOnDispose(state::dispose));
                });
        }
    }
    
    

    You could try it via:

    // generate events over time
            Observable.fromArray(1, 2, 3, 5, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31)
            .flatMap(v -> Observable.timer(v * 100, TimeUnit.MILLISECONDS).map(w -> v))
    
    // apply operator
            .compose(BufferWithTimeout.create(
                 700, TimeUnit.MILLISECONDS, Schedulers.computation()
            ))
    
    // wait for it all
            .blockingSubscribe(System.out::println);
    
    

    Note though that this creates way more objects per source element, there is way around it but it would get way more complicated.