Search code examples
javafxrx-javakotlinbackpressurerx-kotlin

RxJava - Backpressuring keyboard inputs?


Here's a fun RxJava problem.

I want to use RxJava backpressure operators to quickly lookup a typed input while each character is being typed, much like Google does on its search page. I went through the Backpressure documentation and I came up with this (I am using RxJavaFX/RxKotlinFX to leverage JavaFX).

val myControl: Node = ...
val burstyMulticast = myControl.events(KeyEvent.KEY_TYPED).publish().refCount()
val burstyDebounced = burstyMulticast.debounce(200, TimeUnit.MILLISECONDS)
val burstyBuffered = burstyMulticast.buffer(burstyDebounced)

burstyBuffered
        .flatMap { it.toObservable().map { it.character }.reduce("") { x,y -> x + y } }
        .subscribe { println(it) }

This works great. If I type "Hello" against the control it will emit the String "Hello" after 200 ms of no typing. But if I want this to truly be more responsive, I should actually have some sort of rolling accumulation for each keystroke input. Then my console output should really look like this:

H
He
Hel
Hell
Hello

Those should be all my emissions when I type the word "Hello", and the 200ms defines how much time should elapse before the accumulation resets. How do I do this?


Solution

  • Got it! I figured out that I actually don't want buffer(), but rather a switchMap() and a scan() inside of it. I use a multicast to drive both the timer reset emission pushed into switchMap(), and the scan() which will infinitely concatenate typed characters until the switchMap() kills it for resetting.

    val burstyMulticast = events(KeyEvent.KEY_TYPED).publish().refCount().map { it.character }
    
    burstyMulticast.throttleLast(1000, TimeUnit.MILLISECONDS).startWith("")
            .switchMap {
                burstyMulticast.scan { x,y -> x + y }
            }.subscribe { println(it) }