Search code examples
javareactive-programmingrx-java2

Get data asynchronously and then then process data serially with RxJava


I have a function that creates an observable that is responsible for fetching some data. This function is called multiple times so that there are N threads in the background that fetch data. Once the data have been fetched, I have to process them serially. Each round of processing should begin when a notification arrives and all the data are processed one by one (we don't care about the order or if there are still X background threads that still fetch data while we process the data we currently have).

Currently I have a solution implemented but it's not entirely correct as the currentSubject only returns the first value.

Example code here:

package Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;

public class Test {
  public static void main(String[] args) throws InterruptedException {
    var t = new Test();

    var<String> currentSubject = PublishSubject.create();
    var n = 4;
    for (var i = 0; i < n; ++i) {
      Observable.zip(
              currentSubject,
              t.getData(String.valueOf(i), ThreadLocalRandom.current().nextInt(1, 5)),
              (s1, s2) -> s1 + " for " + s2)
          .first("Default")
          .doOnEvent(
              (s, e) -> {
                System.out.println("Processing result: " + s);
              })
          .subscribe();
    }

    for (var i = 0; i < n; ++i) {
      Thread.sleep(i * 2000);
      currentSubject.onNext("A notification: "+i);
    }
    System.out.println("End");
  }

  public Observable<String> getData(String s, int sec) {
    return Observable.just(s)
        .delay(sec, TimeUnit.SECONDS)
        .doOnNext(
            dt -> {
              System.out.println("Got data: " + dt);
            })
        .subscribeOn(Schedulers.newThread());
  }
}

What am I doing wrong and "currentSubject" only returns the first value? I'm creating multiple Observables each time, can this cause a problem e.g. having multiple Observables in memory while they should have been disposed? And finally how can I use the "currentSubject" multiple times without a problem?


Solution

  • I was able to solve the problem by creating two PublishSubjects. One that is responsible for fetching the data in a background thread and one that is called when a notification arrives. Using the "zip"/"zipWith" operator the data are processed only when a notification arrives, one by one.

    Sample code:

    package Test;
    
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    import io.reactivex.rxjava3.subjects.PublishSubject;
    
    public class Test {
      public static void main(String[] args) throws InterruptedException {
      var t = new Test();
      var<String> fetchDataSubject = PublishSubject.create();
      var<String> notificationSubject = PublishSubject.create();
    
      fetchDataSubject
          .flatMap(
              (e) -> {
                return t.getData(String.valueOf(e), ThreadLocalRandom.current().nextInt(1, 
                       4));
            })
          .zipWith(notificationSubject, (s1, s2) -> s1 + " for " + s2)
          .observeOn(Schedulers.single())
          .subscribe(
              (s) -> {
                System.out.println(
                    "Result: " + s + ", running on thread: " + 
                      Thread.currentThread().getName());
              },
              (e) -> {
                System.out.println("Error: " + e);
              });
    
      // Start fetching all data in background jobs
      var n = 4;
      for (var i = 0; i < n; i++) {
        fetchDataSubject.onNext(String.valueOf(i));
      }
      
      // Send notifications
      for (var i = 0; i < n; ++i) {
        notificationSubject.onNext(" \"Notification " + i + "\"");
    
        // Emulate notification arriving after the processing
        Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10) * 1000);
      }
    
      Thread.sleep(40000);
    }
    
    public Observable<String> getData(String s, int sec) {
    
      // Wait some time to fetch data on another thread
      return Observable.just(s)
          .delay(sec, TimeUnit.SECONDS)
          .doOnNext(
              dt -> {
                System.out.println(
                    "Got data: "
                        + dt
                        + " after waiting: "
                        + sec
                        + "s, on  thread "
                        + Thread.currentThread().getName());
              })
          .subscribeOn(Schedulers.newThread());
      }
    }