Search code examples
androidkotlinandroid-roomkotlin-flow

The subscription of Flow doesn't get reaction when I change DB from class without subscription


I have an app with Room and Hilt (all modules are SingletonComponent::class) inside. And I have 2 interfaces:

  1. SourceOfData - for subscribe to Room Flows and put values into SharedFlow for many View Models
  2. SomeUseCase - for inserting of changes to DB
interface SomeUseCase {
    suspend fun addPerson(person: Person)
    suspend fun deletePersonById(personId: Int)
    suspend fun addItemByPersonId(personId: Int, item: Item)
    suspend fun deleteItemById(itemId: Int)
}

interface SourceOfData {
    fun personSharedFlowForMap(): SharedFlow<List<Person>>

    //Test actions
    suspend fun addPerson(person: Person)
    suspend fun deletePersonById(personId: Int)
    suspend fun addItemByPersonId(personId: Int, item: Item)
    suspend fun deleteItemById(itemId: Int)
}

SourceOfDataImpl

class SourceOfDataImpl @Inject constructor(
    private val personDao: PersonDao,
    private val itemDao: ItemDao
) : SourceOfData {
 
    private val _personFlowMap = MutableSharedFlow<List<Person>>()
    val personFlowMap: SharedFlow<List<Person>> = _personFlowMap.asSharedFlow()
 
    init {
        subscribeToPersonMapWithItemDao()
    }
 
    private fun subscribeToPersonMapWithItemDao() {
        CoroutineScope(Dispatchers.IO).launch {
            combine(
                personDao.getPersons(),
                itemDao.getItems()
            ) { personEntityList, itemEntityList ->
                personEntityList.map { personEntity ->
                    Person(
                        personEntity.id,
                        personEntity.name,
                        itemEntityList
                            .filter { it.personId == personEntity.id }
                            .map { it.toItem() }
                    )
                }
            }.collect {
                _personFlowMap.emit(it)
            }
        }
    }
...
}

BaseViewModel

abstract class BaseViewModel(private val sourceOfData: SourceOfData) : ViewModel() {
    abstract fun updateUiStateMap(personUiMap: PersonUiMap)

    fun subscribeToDataMap() {
        viewModelScope.launch {
            sourceOfData.personSharedFlowForMap().collect {
                val personList = it
                updateUiStateMap(convertDataListToUiMap(personList))
            }
        }
    }
}

The problem: If I try to update DB by SomeUseCase nothing happens with SharedFlow but DB changes. If I try to use SourceOfData interface the app works correctly.

MainViewModel

    //TODO("Example")
    suspend fun addPerson(personName: String) {
//        someUseCase.addPerson(Person(name = personName)) //TODO("Not OK")
//        sourceOfData.addPerson(Person(name = personName)) //TODO("OK")
    }

The question: How can I solve this issue correctly?

Of course, I can use one interface for many Daos and have no problems. But I think it's not a good way.


Solution

  • Creating a new CoroutineScope out of thin air somewhere in the internals of a data source as you do it in subscribeToPersonMapWithItemDao defeats the purpose of structured concurrency. That coroutine cannot be controlled from anywhere. Instead, you should pass the scope the shared flow should use as a parameter. That will also make testing a lot easier because you can inject a testing scope. Use your dependency injection framework to supply an appropriate scope.

    Now to the problem at hand. In general you should collect flows only in the UI. This is certainly true for what you do here in SourceOfDataImpl. Instead, you should just transform the flow like this:

    val personFlowMap: SharedFlow<List<Person>> = combine(
        personDao.getPersons(),
        itemDao.getItems(),
    ) { personEntityList, itemEntityList ->
        personEntityList.map { personEntity ->
            Person(
                personEntity.id,
                personEntity.name,
                itemEntityList
                    .filter { it.personId == personEntity.id }
                    .map { it.toItem() },
            )
        }
    }
        // only needed when the scope isn't already using that dispatcher, which it probably will when you inject it
        .flowOn(Dispatchers.IO)
        .shareIn(
            scope = scope, // the replacement for CoroutineScope() that should be provided as a parameter
            started = SharingStarted.WhileSubscribed(5_000),
            replay = 1,
        )
    

    The same goes for the view model: It also shouldn't collect flows. It should just transform the flow from the data source to a UI state object. The same way that the data source used combine to merge the flows, your view model should use mapLatest to transform the raw data into a UI state object. Finally, the flow should be converted to a StateFlow and assigned to a property in the view model. That works very similar to what the above code does to create a SharedFlow; you just use stateIn instead of shareIn. You need to specify an initial value that the flow will emit as its first value until the upstream flows emit their first value. Now you can use your UI to finally collect the flow from the view model.

    Letting only your UI collect the flows should most likely fix your problem.

    One last comment: You should probably move the flow transformation from your data source to a repository class. That's also where the flows should be converted to SharedFlows. Your view models then will only communicate with the repositories.