I am implementing an Android app with Room. At the moment, I already have included the Guave and Jetpack libaries into my project for other parts. However, I do not mind to include another library (i.e. JxJava), if this helps to solve my problem. However, I have not found any authoritative documentation for my question.
I need to to peform an asynchronous, observable query on the data and the observation handler must run on a background thread, because I have to do some costly post-processing of the result.
For Java, there are two options how asynchronously interact with room: using Rxjava or Guava/LiveData (see Write asynchronous DAO queries - Language and framework options)
If I wanted an asynchronous, one-shot (opposed to observable) query and take Guava/LiveData, then the API returns a ListenableFuture
(see Write asynchronous DAO queries - Write asynchronous one-shot queries). The ListenableFuture
takes a Runnable
as listener which is associated to an Executator
which is used to dispatch the listener when the data changes (see ListenableFuture#addListener). That's great because I already have a central ThreadPool
executor in my app for background tasks. However, this is a one-shot query which I do not want.
An asynchronous, observable query in combination with Guava/LiveData, returns a LiveData
(see Write asynchronous DAO queries - Write asynchronous observable queries. That's a pity because the onChange
method of a Observer
of a LiveData
is always executed on the main (GUI) thread. That's a design principle of LiveData
, because they are supposed to update the UI, but that's not want I need. Of course, I could use the onChange
method on the main thread, to dispatch a Runnable
on my background executor and jump off into the background again, but that seems to involve unnecessary context switches between background and main tread.
So I was considering to use RxJava. An asynchronous, observable query in combination with RxJava, returns a Flowable
. One can use a Consumer
to subscribe to the Flowable
. However, I did not find any information on which thread the accept
-method of Consumer
is dispatched, when the Flowable
emits a new value. According to the RxJava docs that is the responsibility of the creator of the Flowable
, because RxJava only defines abstract interfaces, but does not stipulate a particular implementation. In the case at hand, the creator of the Flowable
is the Room library, but it seems to be undocumented, which thread is used by Room for the Flowable
.
Does Room use the main thread to update its Flowable
? (That would be bad and no improvement over LiveData
). Does Room use the same background thread to update its Flowable
as for database queries? (That would not be completly bad, but still improvable.) Or does Room fork a new thread which is only used to update its Flowable
? (Good)
Bonus: In order to keep the number of forked and destroyed threads small, I would highly appreciate, if Room could use my application-wide ThreadPoolExecutor
not only for Flowables
but also for its queries and all other asynchronous tasks.
Things you can't find in the docs, can always be found in the source code:D
Assuming such query:
@Query("SELECT * FROM table")
fun query(): Flowable<List<Entity>>
Room's annotation processor will generate some code, which will contain this:
return RxRoom.createFlowable(__db, true, new String[]{"table"}, new Callable<List<Entity>>() {... /*not important here*/}
Checking the implementation of this method:
Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.subscribeOn(scheduler)
.unsubscribeOn(scheduler)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
So they use a scheduler, which is created from executor.
You can find some info about this executor here androidx.room.RoomDatabase.Builder#setQueryExecutor
:
* When both the query executor and transaction executor are unset, then a default
* {@code Executor} will be used. The default {@code Executor} allocates and shares threads
* amongst Architecture Components libraries. If the query executor is unset but a
* transaction executor was set, then the same {@code Executor} will be used for queries.
* <p>
* For best performance the given {@code Executor} should be bounded (max number of threads
* is limited).
Example
The following code
dao.query().subscribe({
Log.d("CheckThread1", "onSuccess ${Thread.currentThread().name}")
}, {
Log.d("CheckThread1", "onError ${Thread.currentThread().name}")
})
Will result in:
D/CheckThread1: onSuccess arch_disk_io_0
So as you can see, this operation is not handled on a main thread.
And yes, you can set your own executor when you build the database:
Room.databaseBuilder(
context.applicationContext,
Database::class.java,
"main.db"
)
.setTransactionExecutor(.../*your executor*/)
.setQueryExecutor(.../*your executor*/)
.build()