I have a function that creates an observable that is responsible for fetching some data. This function is called multiple times so that there are N threads in the background that fetch data. Once the data have been fetched, I have to process them serially. Each round of processing should begin when a notification arrives and all the data are processed one by one (we don't care about the order or if there are still X background threads that still fetch data while we process the data we currently have).
Currently I have a solution implemented but it's not entirely correct as the currentSubject only returns the first value.
Example code here:
package Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class Test {
public static void main(String[] args) throws InterruptedException {
var t = new Test();
var<String> currentSubject = PublishSubject.create();
var n = 4;
for (var i = 0; i < n; ++i) {
Observable.zip(
currentSubject,
t.getData(String.valueOf(i), ThreadLocalRandom.current().nextInt(1, 5)),
(s1, s2) -> s1 + " for " + s2)
.first("Default")
.doOnEvent(
(s, e) -> {
System.out.println("Processing result: " + s);
})
.subscribe();
}
for (var i = 0; i < n; ++i) {
Thread.sleep(i * 2000);
currentSubject.onNext("A notification: "+i);
}
System.out.println("End");
}
public Observable<String> getData(String s, int sec) {
return Observable.just(s)
.delay(sec, TimeUnit.SECONDS)
.doOnNext(
dt -> {
System.out.println("Got data: " + dt);
})
.subscribeOn(Schedulers.newThread());
}
}
What am I doing wrong and "currentSubject" only returns the first value? I'm creating multiple Observables each time, can this cause a problem e.g. having multiple Observables in memory while they should have been disposed? And finally how can I use the "currentSubject" multiple times without a problem?
I was able to solve the problem by creating two PublishSubjects. One that is responsible for fetching the data in a background thread and one that is called when a notification arrives. Using the "zip"/"zipWith" operator the data are processed only when a notification arrives, one by one.
Sample code:
package Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class Test {
public static void main(String[] args) throws InterruptedException {
var t = new Test();
var<String> fetchDataSubject = PublishSubject.create();
var<String> notificationSubject = PublishSubject.create();
fetchDataSubject
.flatMap(
(e) -> {
return t.getData(String.valueOf(e), ThreadLocalRandom.current().nextInt(1,
4));
})
.zipWith(notificationSubject, (s1, s2) -> s1 + " for " + s2)
.observeOn(Schedulers.single())
.subscribe(
(s) -> {
System.out.println(
"Result: " + s + ", running on thread: " +
Thread.currentThread().getName());
},
(e) -> {
System.out.println("Error: " + e);
});
// Start fetching all data in background jobs
var n = 4;
for (var i = 0; i < n; i++) {
fetchDataSubject.onNext(String.valueOf(i));
}
// Send notifications
for (var i = 0; i < n; ++i) {
notificationSubject.onNext(" \"Notification " + i + "\"");
// Emulate notification arriving after the processing
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10) * 1000);
}
Thread.sleep(40000);
}
public Observable<String> getData(String s, int sec) {
// Wait some time to fetch data on another thread
return Observable.just(s)
.delay(sec, TimeUnit.SECONDS)
.doOnNext(
dt -> {
System.out.println(
"Got data: "
+ dt
+ " after waiting: "
+ sec
+ "s, on thread "
+ Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newThread());
}
}