Search code examples
androidrx-java2kotlinx.coroutines

Android ViewState using RxJava or kotlin coroutines


I'm trying to learn how to use RxJava in Android, but have run into a dead end. I have the following DataSource:

object DataSource {

    enum class FetchStyle {
        FETCH_SUCCESS,
        FETCH_EMPTY,
        FETCH_ERROR
    }

    var relay: BehaviorRelay<FetchStyle> = BehaviorRelay.createDefault(FetchStyle.FETCH_ERROR)

    fun fetchData(): Observable<DataModel> {
        return relay
            .map { f -> loadData(f) }
    }

    private fun loadData(f: FetchStyle): DataModel {
        Thread.sleep(5000)

        return when (f) {
            FetchStyle.FETCH_SUCCESS -> DataModel("Data Loaded")
            FetchStyle.FETCH_EMPTY -> DataModel(null)
            FetchStyle.FETCH_ERROR -> throw IllegalStateException("Error Fetching")
        }
    }
}

I want to trigger an update downstream, whenever I change the value of relay, but this doesn't happen. It works when the Activity is initialized, but not when I'm updating the value. Here's my ViewModel, from where I update the value:

class MainViewModel : ViewModel() {

    val fetcher: Observable<UiStateModel> = DataSource.fetchData().replay(1).autoConnect()
        .map { result -> UiStateModel.from(result) }
        .onErrorReturn { exception -> UiStateModel.Error(exception) }
        .startWith(UiStateModel.Loading())
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())

    fun loadSuccess() {
        DataSource.relay.accept(DataSource.FetchStyle.FETCH_SUCCESS)
    }

    fun loadEmpty() {
        DataSource.relay.accept(DataSource.FetchStyle.FETCH_EMPTY)
    }

    fun loadError() {
        DataSource.relay.accept(DataSource.FetchStyle.FETCH_ERROR)
    }
}

This is the code from the Activity that does the subsciption:

model.fetcher
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                    uiState -> mainPresenter.loadView(uiState)
            })

Solution

  • Ended up using kotlin coroutines instead, as I was unable to re-subscribe to ConnectableObservable and start a new fetch.

    Here's the code for anyone interested.

    The presenter:

    class MainPresenter(val view: MainView) {
    
        private lateinit var subscription: SubscriptionReceiveChannel<UiStateModel>
    
        fun loadSuccess(model: MainViewModel) {
            model.loadStyle(DataSource.FetchStyle.FETCH_SUCCESS)
        }
    
        fun loadError(model: MainViewModel) {
            model.loadStyle(DataSource.FetchStyle.FETCH_ERROR)
        }
    
        fun loadEmpty(model: MainViewModel) {
            model.loadStyle(DataSource.FetchStyle.FETCH_EMPTY)
        }
    
        suspend fun subscribe(model: MainViewModel) {
            subscription = model.connect()
            subscription.subscribe { loadView(it) }
        }
    
        private fun loadView(uiState: UiStateModel) {
            when(uiState) {
                is Loading -> view.isLoading()
                is Error -> view.isError(uiState.exception.localizedMessage)
                is Success -> when {
                    uiState.result != null -> view.isSuccess(uiState.result)
                    else -> view.isEmpty()
                }
            }
        }
    
        fun unSubscribe() {
            subscription.close()
        }
    }
    
    inline suspend fun <E> SubscriptionReceiveChannel<E>.subscribe(action: (E) -> Unit) = consumeEach { action(it) }
    

    The view:

    ...
    override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            launch(UI) {
                mainPresenter.subscribe(model)
            }
    
            btn_load_success.setOnClickListener {
                mainPresenter.loadSuccess(model)
            }
    
            btn_load_error.setOnClickListener {
                mainPresenter.loadError(model)
            }
    
            btn_load_empty.setOnClickListener {
                mainPresenter.loadEmpty(model)
            }
        }
    
        override fun onDestroy() {
            super.onDestroy()
            Log.d("View", "onDestroy()")
            mainPresenter.unSubscribe()
        }
    ...
    

    The model:

    class MainViewModel : ViewModel() {
    
        val TAG = this.javaClass.simpleName
    
        private val stateChangeChannel = ConflatedBroadcastChannel<UiStateModel>()
    
        init {
            /** When the model is initialized we immediately start fetching data */
            fetchData()
        }
    
        override fun onCleared() {
            super.onCleared()
            Log.d(TAG, "onCleared() called")
            stateChangeChannel.close()
        }
    
        fun connect(): SubscriptionReceiveChannel<UiStateModel> {
            return stateChangeChannel.openSubscription()
        }
    
        fun fetchData() = async {
            stateChangeChannel.send(UiStateModel.Loading())
            try {
                val state = DataSource.loadData().await()
                stateChangeChannel.send(UiStateModel.from(state))
    
            } catch (e: Exception) {
                Log.e("MainModel", "Exception happened when sending new state to channel: ${e.cause}")
            }
        }
    
        internal fun loadStyle(style: DataSource.FetchStyle) {
            DataSource.style = style
            fetchData()
        }
    }
    

    And here's a link to the project on github.