I use Flowable
chain for periodic call of request (API) for tasks of users.
Flowable.interval(60, TimeUnit.SECONDS)
.flatMapCompletable(_tick ->
mUserRepository.getAllUsers()
.flatMapObservable(Observable::fromIterable)
.flatMap(user ->
downloadAndPersistTasks(user)
.subscribeOn(Schedulers.io())
)
.subscribeOn(Schedulers.io())
, false, 1
);
Method downloadAndPersistData
downloads current tasks of each user, remove all old tasks from database and persist downloaded tasks. Tasks of users can be changed from server side.
Problem is relatively long duration of downloading.
This case:
I need to disable downloading of data for specific user when insert API call is performed and disable function insert task when periodic update is performed.
Is there any RxJava solution or is the best choice to use native synchronization primitives of Java framework? But I don't need to skip periodic update, only delay it.
I am going to assume that the task insertion step is done locally, along with downloadAndPersistTasks()
.
Let's introduce a typed union: Either<L,R>
. It has static factory methods Either.<L>createLeft( L value )
and Either.<R>createRight( R value )
. It also has class methods: isLeft()
, isRight()
, and getLeft()
/getRight()
to do the natural thing.
A Subject
is used to inject the insert task step:
PublishSubject<InsertTask> taskInserter = PublishSubject.create();
Then we combine them in the following way:
Flowable.timer(60, SECONDS)
.flatMap( tick ->
Observable.fromIterable( mUserRepository.getAllUsers() ), 1 )
.map( t -> Either.createLeft(t) )
.mergeWith( taskInserter.map( i -> Either.createRight( i ) ), 1 )
.observeOn( scheduler )
.subscribe( ti -> {
if ( ti.isLeft() ) {
downloadAndPersistTasks( ti.getLeft() );
} else {
insertTask( ti.getRight() );
}
);
The second argument to flatMap()
ensures that only one user is processed at a time and the same for the mergeWith()
operator. Merging the second stream ensures that only one operation is ever performed at a time, and the observeOn()
operator puts all operations on to the same thread, so there will be no contention.
If you need greater parallelism, or finer grained control, you may need to introduce an observable per user.