Here's a fun RxJava problem.
I want to use RxJava backpressure operators to quickly lookup a typed input while each character is being typed, much like Google does on its search page. I went through the Backpressure documentation and I came up with this (I am using RxJavaFX/RxKotlinFX to leverage JavaFX).
val myControl: Node = ...
val burstyMulticast = myControl.events(KeyEvent.KEY_TYPED).publish().refCount()
val burstyDebounced = burstyMulticast.debounce(200, TimeUnit.MILLISECONDS)
val burstyBuffered = burstyMulticast.buffer(burstyDebounced)
burstyBuffered
.flatMap { it.toObservable().map { it.character }.reduce("") { x,y -> x + y } }
.subscribe { println(it) }
This works great. If I type "Hello" against the control it will emit the String
"Hello" after 200 ms of no typing. But if I want this to truly be more responsive, I should actually have some sort of rolling accumulation for each keystroke input. Then my console output should really look like this:
H
He
Hel
Hell
Hello
Those should be all my emissions when I type the word "Hello", and the 200ms defines how much time should elapse before the accumulation resets. How do I do this?
Got it! I figured out that I actually don't want buffer()
, but rather a switchMap()
and a scan()
inside of it. I use a multicast to drive both the timer reset emission pushed into switchMap()
, and the scan()
which will infinitely concatenate typed characters until the switchMap()
kills it for resetting.
val burstyMulticast = events(KeyEvent.KEY_TYPED).publish().refCount().map { it.character }
burstyMulticast.throttleLast(1000, TimeUnit.MILLISECONDS).startWith("")
.switchMap {
burstyMulticast.scan { x,y -> x + y }
}.subscribe { println(it) }