Search code examples
javakotlinrx-java2rx-kotlin

How to dynamically scale debounce of burst emission stream?


I've got a buffered stream, waiting for a predetermined amount of silence time, before publishing a list of elements that have been buffered:

INTEGERS
     .share()
     .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler))
     .map { durations ->
       ... 
     }

I'd like to make DEBOUNCE_TIME dynamically adjust depending on the average of the buffered items, but I'm having a hard time figuring out how to achieve this.


Solution

  • You could defer the debounce, take one item of it and trigger repeat once the new debounce value has been determined:

    int DEBOUNCE_TIME = 100;
    AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME);
    PublishSubject<Integer> mayRepeat = PublishSubject.create();
    
    AtomicInteger counter = new AtomicInteger();
    
    Observable<Integer> INTEGERS =
            Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200)
            .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS)
                    .map(w -> counter.incrementAndGet()));
    
    INTEGERS.publish(o ->
            o.buffer(
                Observable.defer(() ->
                    o.debounce(
                        debounceTime.get(), TimeUnit.MILLISECONDS)
                )
                .take(1)
                .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b))
            )
        )
        .map(list -> {
            int nextDebounce = Math.min(100, list.size() * 100);
            debounceTime.set(nextDebounce);
            mayRepeat.onNext(1);
            return list;
        })
        .blockingSubscribe(System.out::println);
    

    This prints:

    [1, 2]
    [3, 4, 5]
    [6, 7, 8, 9]
    [10]