Search code examples
kotlinretrofitandroid-roomkotlin-flow

Room flow not emitting on insert/update/upsert


I'm having issues where I have a flow of Room data that doesn't emit updates when changes are made to the underlying database table.

API Service

interface AtemsService {

    @GET("$PATH/data/_apps.json")
    suspend fun getPackageData(): Map<String, AtemsPackageInfo>
}

Remote Impl

class AtemsRemoteImpl @Inject constructor(private val atemsService: AtemsService) : AtemsRemote {

    override suspend fun getAtemsPackageData(): Map<String, AtemsPackageInfo> =
        atemsService.getPackageData()
}

Local Dao

@Dao
interface AtemsPackageDao {

    @Query("SELECT * FROM $TABLE_NAME")
    suspend fun getAll(): List<LocalAtemsPackage>

    @Upsert
    suspend fun upsertAll(localAtemsPackages: List<LocalAtemsPackage>)
}

Repo Impl

class AtemsPackageRepositoryImpl @Inject constructor(
    private val atemsRemote: AtemsRemote,
    private val atemsPackageDao: AtemsPackageDao,
    private val atemsPackageMapper: AtemsPackageMapper,
) : AtemsPackageRepository {

    override fun observeAtemsPackages(): Flow<List<AtemsPackage>> {
        return flow {
            val localAtemsPackages = atemsPackageDao.getAll()
            emit(atemsPackageMapper.localToDomain(localAtemsPackages))
            if (localAtemsPackages.isEmpty()) {
                val fetchRemotePackages = atemsRemote.getAtemsPackageData()
                val convertToLocal = atemsPackageMapper.remoteToLocal(fetchRemotePackages)
                atemsPackageDao.upsertAll(convertToLocal)
            }
        }
    }
}

Use Case

class AppsUseCaseImpl @Inject constructor(
    atemsPackageRepository: AtemsPackageRepository,
) : AppsUseCase {

    override val observeApps: Flow<List<AtemsPackage>> = atemsPackageRepository.observeAtemsPackages()
}

View Model

@HiltViewModel
class SidePanelAppListViewModel @Inject constructor(appsUseCase: AppsUseCase) : ViewModel() {

    val apps = appsUseCase.observeApps.stateIn(
        scope = viewModelScope,
        started = WhileUiSubscribed,
        initialValue = emptyList()
    )
}

and finally; View

@Composable
fun SidePanelAppList(
    modifier: Modifier = Modifier,
    viewModel: SidePanelAppListViewModel = hiltViewModel(),
) {
    val apps by viewModel.apps.collectAsStateWithLifecycle()

    SidePanelAppList(
        modifier = Modifier.then(modifier),
        apps = apps
    )
}

Everything is working as expected except when the method in the repo makes a network call to retrieve data, despite successfully saving the data to the Room database, my ui layer doesn't collect/display the updates unless the app is restarted. Any guidance would be immensely appreciated!

Update

Following the very helpful steer in the right direction by Tenfour04, I finished up with this (requirements changed slightly as I now update based on a value retrieved from the remote.

@OptIn(ExperimentalCoroutinesApi::class)
override fun observeAtemsPackages(): Flow<List<AtemsPackage>> =
    atemsPackageDao.observeAll().transformLatest {
        val lastLocalUpdate = settingsDataStore.getAtemsRemoteLastUpdate.first()
        val lastRemoteUpdate = atemsRepoRemote.getAtemsLastUpdate().lastUpdated
        if (lastRemoteUpdate > lastLocalUpdate) {
            val fetchRemotePackages = atemsRepoRemote.getAtemsPackageData()
            val convertToLocal = atemsPackageMapper.remoteToLocal(fetchRemotePackages)
            atemsPackageDao.upsertAll(convertToLocal)
            settingsDataStore.setAtemsRemoteLastUpdate(lastRemoteUpdate)
        }
        emit(atemsPackageMapper.localToDomain(it))
    }

Solution

  • The flow you created in the observe method never emits anything after updating the database. I think it would be cleanest to have your DAO expose the list as a Flow, and use an onEach to trigger your fetch if needed.

    DAO:

    @Dao
    interface AtemsPackageDao {
    
        @Query("SELECT * FROM $TABLE_NAME")
        fun getAll(): Flow<List<LocalAtemsPackage>>
    
        @Upsert
        suspend fun upsertAll(localAtemsPackages: List<LocalAtemsPackage>)
    }
    

    In your repo implementation:

    override fun observeAtemsPackages(): Flow<List<AtemsPackage>> 
        = atemsPackageDao.getAll()
            .onEach {
                if (it.isEmpty()) {
                    val fetchRemotePackages = atemsRemote.getAtemsPackageData()
                    val convertToLocal = atemsPackageMapper.remoteToLocal(fetchRemotePackages)
                    atemsPackageDao.upsertAll(convertToLocal)
                }
            }
    

    Now your flow is purely returning up-to-date database status. The onEach action is a side effect.

    If it’s possible you can fetch an empty list, you probably need to do something so it only attempts the fetch once:

    override fun observeAtemsPackages(): Flow<List<AtemsPackage>> 
        = flow {
            val source = atemsPackageDao.getAll()
            if (source.first().isEmpty()) {
                val fetchRemotePackages = atemsRemote.getAtemsPackageData()
                val convertToLocal = atemsPackageMapper.remoteToLocal(fetchRemotePackages)
                atemsPackageDao.upsertAll(convertToLocal)
            }
            emitAll(source)
        }