Search code examples
kotlinrx-javarx-java2

Observable that buffers items when no subscribers, then emits them and clears the buffer when it is subscribed to?


I want an observable that:

  1. Can emit items on demand and never really completes (a hot observable?)
  2. Is aware of when it has subscribers or not
  3. If no subscribers, it will buffer items that I tell it to emit
  4. When subscribed, it will emit the buffered items in order, then clear the buffer, and then continue to allow me to emit more items
  5. When unsubscribed (subscriber is disposed?), it will go back to buffering.

Also:

  1. There is only expected to be one subscriber at a time

  2. It does not need to be thread safe

Here's sort of a pseudocode of what I am thinking -- I don't have the necessary callbacks though to do this the right way. Also it would be nice if I could wrap it all up in an Observable or Subject.

class RxEventSender {
    private val publishSubject = PublishSubject.create<Action>()

    val observable: Observable<Action> = publishSubject

    private val bufferedActions = arrayListOf<Action>()

    private var hasSubscribers = false

    fun send(action: Action) {
        if (hasSubscribers) {
            publishSubject.onNext(action)
        } else {
            bufferedActions.add(action)
        }
    }

    //Subject was subscribed to -- not a real callback
    fun onSubscribed() {
        hasSubscribers = true
        bufferedActions.forEach {action ->
            publishSubject.onNext(action)
        }
        bufferedActions.clear()
    }

    //Subject was unsubscribed -- not a real callback
    fun onUnsubscribed() {
        hasSubscribers = false
    }
}

Solution

  • After spending some time, I think this works well enough for my needs. It's not wrapped up nicely in a Subject or Observable but all I needed was to emit items and subscribe.

    class RxEventSender<T> {
        private val bufferedEvents = arrayListOf<T>()
    
        private val publishSubject = PublishSubject.create<T>()
    
        val observable: Observable<T> = publishSubject
                .mergeWith(Observable.fromIterable(bufferedEvents))
                .doOnDispose {
                    bufferedEvents.clear()
                }
    
        fun send(event: T) {
            if (publishSubject.hasObservers()) {
                publishSubject.onNext(event)
            } else {
                bufferedEvents.add(event)
            }
        }
    }