Search code examples
javaasynchronousgrpcnonblockingcompletable-future

Java CompletableFuture for sequential code


My new team is writing a Java gRPC service and to ensure that we never block the request thread we ended-up wrapping more or less ALL methods inside a CompletableFuture even if those endpoints are conceptually a sequential list of operation (no parallelism).

So the code look something like (a Java example is available at the end if needed) :

  methodA()
    methodB()
      methodD() (let say this one is a 15ms RPC call)
      methodE()
    methodC()
      methodF() (let say this one is a 5ms CPU intensive work)
      methodG()
 

Context:

  • In practice the application is much bigger and there're many more layers of functions
  • Each application host need to handle 1000 QPS, so you can imagine that methodA is called at that rate
  • Some function (few) make a RPC call that can take 5-30ms (IO)
  • Some function (very few) run CPU intensive work (< 5ms)

Edit 1: After more reading online yesterday, I understand that if, and only if, we are using true non-blocking HTTP and DB Client (and it doesn't seem like JDBC is non-blocking), this pattern can reduce the total number of threads required. My understanding is that if we have enough memory to keep one thread per request, using a synchronous code would still probably be the most efficient implementation (reduce the overhead of switching threads and loading data), but if we didn't have enough memory to keep that many threads alive, then this notion of making the whole code non-blocking can reduce the number of thread and thus allow the application to scale to more request.

Question 1: I understand this unblocks the "request thread", but in practice what's the advantage? Are we truly saving CPU time? In the example below, it feels like "some" thread will be alive the whole time anyways (in the example below, mostly the thread from CompletableFuture.supplyAsync in methodD), it just happens that it’s not the same thread as the one who received the initial request.

Question 2: Is that pattern truly a "best practice" and all services should follow a similar pattern? Outside of making the code a bit harder to read I feel, per request 50+ methods gets invoked and 50+ times we call a mix of CompletableFuture .thenCompose() or .supplyAsync. It seems like it's would be adding some overhead. Was CompletableFuture designed to be used that way across the whole code base in every method?

Annex (java example):

  public void myEndpoint(MyRequest request, StreamObserver<MyResponse> responseObserver) {
    methodA(10)
        .thenApply((response) -> responseObserver.next(response));
    
  }

  public CompletableFuture<Integer> methodA(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodB)
        .thenCompose(this::methodC)
        .thenApply((i) -> {
          System.out.println("MethodA executed by ".concat(Thread.currentThread().getName() + ": " + i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodB(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodD)
        .thenCompose(this::methodE)
        .thenApply((i) -> {
          System.out.println("MethodB executed by ".concat(Thread.currentThread().getName() + ": " + i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodC(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodF)
        .thenCompose(this::methodG)
        .thenApply((i) -> {
          System.out.println("MethodC executed by ".concat(Thread.currentThread().getName() + ": " + i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodD(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        // Assume it's a RPC call that takes 5-30ms
        Thread.sleep(20);
        System.out.println("MethodD executed by ".concat(Thread.currentThread().getName() + ": " + input));
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      return input + 1;
    });
  }

  public CompletableFuture<Integer> methodE(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("MethodE executed by ".concat(Thread.currentThread().getName() + ": " + input));
      return input + 1;
    });
  }

  public CompletableFuture<Integer> methodF(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        // Let's assume it's a CPU intensive work that takes 2-5ms
        Thread.sleep(5);
        System.out.println("MethodF executed by ".concat(Thread.currentThread().getName() + ": " + input));
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      return input + 1;
    });
  }

  public CompletableFuture<Integer> methodG(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("MethodG executed by ".concat(Thread.currentThread().getName() + ": " + input));
      return input + 1;
    });
  }

Solution

  • The premise is that threads are a scarce resource, which is not intrinsic to threads but a consequence of using a pool of threads with a configured maximum. The reason today’s frameworks use a pool is that threads, as implemented today, are expensive and creating too many of them can cause significant performance problems.

    You wrote

    My understanding is that if we have enough memory to keep one thread per request, using a synchronous code would still probably be the most efficient implementation…

    which is going into the right direction, but it’s important to keep in mind that there might be more constraints than memory. Some operating system’s schedulers become significantly less efficient with a large number of threads, some may even have a fixed limit on how many threads a process is allowed to create.

    So, when you block a thread by waiting for another, you are limiting the parallel processing capabilities of the thread pool. This applies if you are using, as you put it, a “true non-blocking” API, or just any already existing API that returns futures. Submitting your own operations via supplyAsync has no point as the supplier’s code still is executed by a thread, as you correctly pointed out.

    But when you have an existing future returned by an operation, you should rather chain dependent processing steps instead of waiting for its result via join and friends. Note that calling join() on existing futures can make things even worse than just blocking threads:

    When you call join() on a CompletableFuture, it tries to compensate the problem. When the caller is a worker thread of a Fork/Join pool, one of two things can happen:

    • Instead of doing nothing, it may try to fetch pending jobs and execute them in-place, similar to awaitQuiescence.
      • In the best case, it will directly pick up the job you just scheduled with supplyAsync (if using the same pool) and execute it, almost as if you executed it without CompletableFuture (just consuming far more stack space).
      • In the worst case, the thread will be busy executing a long running, entirely unrelated job while the job it’s actually waiting for has been completed long ago. Imagine what happens if that unrelated job also calls join.
    • It may end up actually blocking the thread but using ForkJoinPool.managedBlock(…), which may start a new worker thread to ensure that the pool’s configured parallelism is kept. Great to solve the problem of reduced parallelism, but on the other hand, reintroducing the very problem of resource consumption you actually wanted to solve with thread pools.

    The worst of all is that you can’t even predict which of the two things will happen.


    There are, however, cases where not blocking a request thread by utilizing other threads has a point. Most notably when the response time for the request itself matters and the results of the background computation are delivered independent of the initial response. The most prominent example of this pattern is the event dispatch thread of GUI frameworks which must be kept free of long running operations, to be able to process subsequent user input.


    Note that there is a general solution on the way, to make 99% of all future chains obsolete. Virtual Threads, which are in preview state in JDK 19, are cheap to create and allow to create one thread per request, just like you envisioned in the cite above. When a virtual thread gets blocked, it will release the underlying platform thread for the next virtual thread, so there is no reason to hesitate to call join() on any future, even those belonging to “true non-blocking” APIs.

    The best way to interoperate with this concept and the status quo is to design methods to not return futures, but perform the operation in-place. It’s still possible to design a future chain when necessary, i.e. by using .thenApplyAsync(this::inPlaceEvalMethod) instead of .thenCompose(this::futureReturningMethod). But at the same time, you can write a plain sequential version just calling these methods, which can be executed by a virtual thread. In fact, you could even add the plain sequential version today and benchmark both approaches. The results might convince your team members that “not blocking the request thread” is not necessarily an improvement.