Search code examples
androidrx-java2rx-androidrx-kotlin2

share() operator not working for Observable in Rxjava


I have a scenario where we i have an emmiter which constantly emits data like this

fun subscribeForEvents(): Flowable<InkChannel> {
    return Flowable.create<InkChannel>({
        if (inkDevice.availableDeviceServices.contains(DeviceServiceType.EVENT_DEVICE_SERVICE)) {
            (inkDevice.getDeviceService(DeviceServiceType.EVENT_DEVICE_SERVICE) as EventDeviceService).subscribe(object : EventCallback {
                override fun onUserActionExpected(p0: UserAction?) {
                    it.onNext(InkChannel.UserActionEvent(p0))
                }

                override fun onEvent(p0: InkDeviceEvent?, p1: Any?) {
                    it.onNext(InkChannel.InkEvents<Any>(p0, p1))
                }

                override fun onUserActionCompleted(p0: UserAction?, p1: Boolean) {

                }
            }

            )
        }
    }, BackpressureStrategy.BUFFER).share()

}

now i have a service which i start on application launch and listen to it

  inkDeviceBus.subscribeForEvents()
                .filter { it -> (it as InkChannel.InkEvents<*>).event == InkDeviceEvent.STATUS_CHANGED }
                .map { it -> it as InkChannel.InkEvents<*> }
                .map { it -> it.value.toString() }
                .filter { value -> value == "CONNECTED" || value == "DISCONNECTED" }
                .map { it -> it == "CONNECTED" }
                .subscribeBy { b ->
                    if (b) stopSelf()
                }

I have another activity MainActivity which is called upon launch where i observe the same event. Now the issue is only the listener in the service gets the events and the activity is not receiving any events.

Now when i remove the listener form the service then activity starts receiving events. I have used the operator share for sharing the observable but it doesn't seem to work


Solution

  • .share() only affects the instance it is called on. Since each call to subscribeForEvents creates a new instance the .share() does not change the behavior.

    You need to call subscribeForEvents once and then use the returned value when you want to receive the events. As long as the same object is used it will share the underlying listener.