Search code examples
javaspring-webfluxreactor

Leverage PriorityBlockingQueue to build producer-comsumer pattern in Java Reactor


In my project, there is a Spring scheduler periodically scans "TO BE DONE" tasks from DB, then distributing them to task consumer for subsequent handling. So, the current implementation is to construct a Reactor Sinks between producer and consumer.

Sinks.Many<Task> taskSink = Sinks.many().multicast().onBackpressureBuffer(1000, false);

Producer:

Flux<Date> dates = loadDates();
dates.filterWhen(...)
     .concatMap(date -> taskManager.getTaskByDate(date))
     .doOnNext(taskSink::tryEmitNext)
     .subscribe();

Consumer:

taskProcessor.process(taskSink.asFlux())
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();

By using Sink, it works fine for most of cases. But when the system under heavy load, system maintainer would want to know:

  1. How many tasks still sitting in the Sink?
  2. If it is possible to clear all tasks within the Sink.
  3. If it is possible to prioritize tasks within the Sink.

Unfortunately, Sink it's impossible to fulfill all the needs mentioned above. So, I created a wrapper class that includes a Map and PriorityBlockingQueue. I refrerenced the implementation from this link https://stackoverflow.com/a/71009712/19278017.

After that, the original producer-consumer code revised as below:

Task queue:

MergingQueue<Task> taskQueue = new PriorityMergingQueue();

Producer:

Flux<Date> dates = loadDates();
dates.filterWhen(...)
     .concatMap(date -> taskManager.getTaskByDate(date))
     .doOnNext(taskQueue::enqueue)
     .subscribe();

Consumer:

taskProcessor.process(Flux.create((sink) -> {
     sink.onRequest(n -> {
          Task task;
          try {
                while(!sink.isCancel() && n > 0) {
                    if(task = taskQueue.poll(1, TimeUnit.SECOND)  != null) {
                    sink.next(task);
                    n--;
                }
          } catch() {
                ....
          })
          .subscribeOn(Schedulers.boundedElastic())
          .subscribe();

I got some questions as below:

  • Will that be an issue the code doing a .poll()? Since, I came across thread hang issue during the longevity testing. Just not sure if it's due to the poll() call.
  • Is there any alternative solution in Reactor, which works like a PriorityBlockingQueue?

Solution

  • The goal of reactive programming is to avoid blocking operations. PriorityBlockingQueue.poll() will cause issues as it will block the thread waiting for the next element.

    There is however an alternative solution in Reactor: the unicast version of Sinks.Many allows using an arbitrary Queue for buffering using Sinks.many().unicast().onBackPressureBuffer(Queue<T>). By using a PriorityQueue instanced outside of the Sink, you can fulfill all three requirements.

    Here is a short demo where I emit a Task every 100ms:

    public record Task(int prio) {}
    
    private static void log(Object message) {
        System.out.println(LocalTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MILLIS) + ": " + message);
    }
    
    public void externalBufferDemo() throws InterruptedException {
        Queue<Task> taskQueue = new PriorityQueue<>(Comparator.comparingInt(Task::prio).reversed());
        Sinks.Many<Task> taskSink = Sinks.many().unicast().onBackpressureBuffer(taskQueue);
    
        taskSink.asFlux()
                .delayElements(Duration.ofMillis(100))
                .subscribe(task -> log(task));
    
        for (int i = 0; i < 10; i++) {
            taskSink.tryEmitNext(new Task(i));
        }
        // Show amount of tasks sitting in the Sink:
        log("Nr of tasks in sink: " + taskQueue.size());
    
        // Clear all tasks in the sink after 350ms:
        Thread.sleep(350);
        taskQueue.clear();
        log("Nr of tasks after clear: " + taskQueue.size());
    
        Thread.sleep(1500);
    }
    

    Output:

    09:41:11.347: Nr of tasks in sink: 9
    09:41:11.450: Task[prio=0]
    09:41:11.577: Task[prio=9]
    09:41:11.687: Task[prio=8]
    09:41:11.705: Nr of tasks after clear: 0
    09:41:11.799: Task[prio=7]
    

    Note that delayElements has an internal queue of size 1, which is why Task 0 was picked up before Task 1 was emitted, and why Task 7 was picked up after the clear.

    If multicast is required, you can transform your flux using one of the many operators enabling multicasting.