Search code examples
rx-javaandroid-roomandroid-livedatathreadpoolexecutor

Is the onComplete method of a Subscriber called on a background thread by Room on Android?


I am implementing an Android app with Room. At the moment, I already have included the Guave and Jetpack libaries into my project for other parts. However, I do not mind to include another library (i.e. JxJava), if this helps to solve my problem. However, I have not found any authoritative documentation for my question.

I need to to peform an asynchronous, observable query on the data and the observation handler must run on a background thread, because I have to do some costly post-processing of the result.

For Java, there are two options how asynchronously interact with room: using Rxjava or Guava/LiveData (see Write asynchronous DAO queries - Language and framework options)

If I wanted an asynchronous, one-shot (opposed to observable) query and take Guava/LiveData, then the API returns a ListenableFuture (see Write asynchronous DAO queries - Write asynchronous one-shot queries). The ListenableFuture takes a Runnable as listener which is associated to an Executator which is used to dispatch the listener when the data changes (see ListenableFuture#addListener). That's great because I already have a central ThreadPool executor in my app for background tasks. However, this is a one-shot query which I do not want.

An asynchronous, observable query in combination with Guava/LiveData, returns a LiveData (see Write asynchronous DAO queries - Write asynchronous observable queries. That's a pity because the onChange method of a Observer of a LiveData is always executed on the main (GUI) thread. That's a design principle of LiveData, because they are supposed to update the UI, but that's not want I need. Of course, I could use the onChange method on the main thread, to dispatch a Runnable on my background executor and jump off into the background again, but that seems to involve unnecessary context switches between background and main tread.

So I was considering to use RxJava. An asynchronous, observable query in combination with RxJava, returns a Flowable. One can use a Consumer to subscribe to the Flowable . However, I did not find any information on which thread the accept-method of Consumer is dispatched, when the Flowable emits a new value. According to the RxJava docs that is the responsibility of the creator of the Flowable, because RxJava only defines abstract interfaces, but does not stipulate a particular implementation. In the case at hand, the creator of the Flowable is the Room library, but it seems to be undocumented, which thread is used by Room for the Flowable.

Does Room use the main thread to update its Flowable? (That would be bad and no improvement over LiveData). Does Room use the same background thread to update its Flowable as for database queries? (That would not be completly bad, but still improvable.) Or does Room fork a new thread which is only used to update its Flowable? (Good)

Bonus: In order to keep the number of forked and destroyed threads small, I would highly appreciate, if Room could use my application-wide ThreadPoolExecutor not only for Flowables but also for its queries and all other asynchronous tasks.


Solution

  • Things you can't find in the docs, can always be found in the source code:D

    Assuming such query:

    @Query("SELECT * FROM table")
    fun query(): Flowable<List<Entity>>
    

    Room's annotation processor will generate some code, which will contain this:

     return RxRoom.createFlowable(__db, true, new String[]{"table"}, new Callable<List<Entity>>() {... /*not important here*/}
    

    Checking the implementation of this method:

    Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createFlowable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    

    So they use a scheduler, which is created from executor. You can find some info about this executor here androidx.room.RoomDatabase.Builder#setQueryExecutor:

         * When both the query executor and transaction executor are unset, then a default
         * {@code Executor} will be used. The default {@code Executor} allocates and shares threads
         * amongst Architecture Components libraries. If the query executor is unset but a
         * transaction executor was set, then the same {@code Executor} will be used for queries.
         * <p>
         * For best performance the given {@code Executor} should be bounded (max number of threads
         * is limited).
    

    Example

    The following code

        dao.query().subscribe({
            Log.d("CheckThread1", "onSuccess ${Thread.currentThread().name}")
        }, {
            Log.d("CheckThread1", "onError  ${Thread.currentThread().name}")
        })
    
        
    

    Will result in:

    D/CheckThread1: onSuccess arch_disk_io_0
    

    So as you can see, this operation is not handled on a main thread.

    And yes, you can set your own executor when you build the database:

    Room.databaseBuilder(
                    context.applicationContext,
                    Database::class.java,
                    "main.db"
                )
                    .setTransactionExecutor(.../*your executor*/)
                    .setQueryExecutor(.../*your executor*/)
                    .build()