Search code examples
javanonblockingcompletable-future

Does the call of join in the following CompletableFuture example block the process


I am trying to understand CompletableFutures and chaining of calls that return completed futures and I have created the below example which kind of simulates two calls to a database.

The first method is supposed to be giving a completable future with a list of userIds and then I need to make a call to another method providing a userId to get the user (a string in this case).

to summarise:

  1. fetch the ids
  2. fetch a list of the users corresponding to those ids.

I created simple methods to simulate the responses with sleep threads. Please check the code below

public class PipelineOfTasksExample {

    private Map<Long, String> db = new HashMap<>();

    PipelineOfTasksExample() {
        db.put(1L, "user1");
        db.put(2L, "user2");
        db.put(3L, "user3");
        db.put(4L, "user4");
    }


    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
    }

    private CompletableFuture<String> fetchById(Long id) {
        CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());
        return cfId;
    }

    public static void main(String[] args) {

        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
                .thenCompose(listOfIds ->
                        CompletableFuture.supplyAsync(
                                () -> listOfIds.parallelStream()
                                        .map(id -> example.fetchById(id).join())
                                        .collect(Collectors.toList()
                                        )
                        )
                );

        System.out.println(result.join());
    }

}

My question is, does the join call (example.fetchById(id).join()) ruin the nonblocking nature of the process? If the answer is positive how can I solve this problem?

Thank you in advance


Solution

  • Your example is a bit odd as you are slowing down the main thread in returnUserIdsFromDb(), before any operation even starts and likewise, fetchById slows down the caller rather than the asynchronous operation, which defeats the entire purpose of asynchronous operations.

    Further, instead of .thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …)) you can simply use .thenApplyAsync(listOfIds -> …).

    So a better example might be

    public class PipelineOfTasksExample {
        private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
            .collect(Collectors.toMap(id -> id, id -> "user"+id));
    
        PipelineOfTasksExample() {}
    
        private static <T> T slowDown(String op, T result) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
            System.out.println(op + " -> " + result + " thread: "
                + Thread.currentThread().getName()+ ", "
                + POOL.getPoolSize() + " threads");
            return result;
        }
        private CompletableFuture<List<Long>> returnUserIdsFromDb() {
            System.out.println("trigger building the list of Ids - thread: "
                + Thread.currentThread().getName());
            return CompletableFuture.supplyAsync(
                () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
                POOL);
        }
        private CompletableFuture<String> fetchById(Long id) {
            System.out.println("trigger fetching id: " + id + " thread: "
                + Thread.currentThread().getName());
            return CompletableFuture.supplyAsync(
                () -> slowDown("fetching id: " + id , db.get(id)), POOL);
        }
    
        static ForkJoinPool POOL = new ForkJoinPool(2);
    
        public static void main(String[] args) {
            PipelineOfTasksExample example = new PipelineOfTasksExample();
            CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
                .thenApplyAsync(listOfIds ->
                    listOfIds.parallelStream()
                        .map(id -> example.fetchById(id).join())
                        .collect(Collectors.toList()
                    ),
                    POOL
                );
            System.out.println(result.join());
        }
    }
    

    which prints something like

    trigger building the list of Ids - thread: main
    building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
    trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
    trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
    trigger fetching id: 4 thread: ForkJoinPool-1-worker-2
    fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads
    fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads
    fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads
    trigger fetching id: 1 thread: ForkJoinPool-1-worker-3
    fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads
    [user1, user2, user3, user4]
    

    which might be a surprising number of threads on the first glance.

    The answer is that join() may block the thread, but if this happens inside a worker thread of a Fork/Join pool, this situation will be detected and a new compensation thread will be started, to ensure the configured target parallelism.

    As a special case, when the default Fork/Join pool is used, the implementation may pick up new pending tasks within the join() method, to ensure progress within the same thread.

    So the code will always make progress and there’s nothing wrong with calling join() occasionally, if the alternatives are much more complicated, but there’s some danger of too much resource consumption, if used excessively. After all, the reason to use thread pools, is to limit the number of threads.

    The alternative is to use chained dependent operations where possible.

    public class PipelineOfTasksExample {
        private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
            .collect(Collectors.toMap(id -> id, id -> "user"+id));
    
        PipelineOfTasksExample() {}
    
        private static <T> T slowDown(String op, T result) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
            System.out.println(op + " -> " + result + " thread: "
                + Thread.currentThread().getName()+ ", "
                + POOL.getPoolSize() + " threads");
            return result;
        }
        private CompletableFuture<List<Long>> returnUserIdsFromDb() {
            System.out.println("trigger building the list of Ids - thread: "
                + Thread.currentThread().getName());
            return CompletableFuture.supplyAsync(
                () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
                POOL);
        }
        private CompletableFuture<String> fetchById(Long id) {
            System.out.println("trigger fetching id: " + id + " thread: "
                + Thread.currentThread().getName());
            return CompletableFuture.supplyAsync(
                () -> slowDown("fetching id: " + id , db.get(id)), POOL);
        }
    
        static ForkJoinPool POOL = new ForkJoinPool(2);
    
        public static void main(String[] args) {
            PipelineOfTasksExample example = new PipelineOfTasksExample();
    
            CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
                .thenComposeAsync(listOfIds -> {
                    List<CompletableFuture<String>> jobs = listOfIds.parallelStream()
                        .map(id -> example.fetchById(id))
                        .collect(Collectors.toList());
                    return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))
                        .thenApply(_void -> jobs.stream()
                            .map(CompletableFuture::join).collect(Collectors.toList()));
                    },
                    POOL
                );
    
            System.out.println(result.join());
            System.out.println(ForkJoinPool.commonPool().getPoolSize());
        }
    }
    

    The difference is that first, all asynchronous jobs are submitted, then, a dependent action calling join on them is scheduled, to be executed only when all jobs have completed, so these join invocations will never block. Only the final join call at the end of the main method may block the main thread.

    So this prints something like

    trigger building the list of Ids - thread: main
    building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
    trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
    trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
    trigger fetching id: 4 thread: ForkJoinPool-1-worker-1
    trigger fetching id: 1 thread: ForkJoinPool-1-worker-0
    fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads
    fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads
    fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads
    fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads
    [user1, user2, user3, user4]
    

    showing that no compensation threads had to be created, so the number of threads matches the configured target parallelism.

    Note that if the actual work is done in a background thread rather than within the fetchById method itself, you now don’t need a parallel stream anymore, as there is no blocking join() call. For such scenarios, just using stream() will usually result in a higher performance.