Search code examples
javajava.util.concurrentforkjoinpool

Java 17 Fork join pool limit number of threads in pool


In java 17, we are seeing that a lot more threads are getting created (although not concurrently executing) as compared to older java version (java 8 in my case). While thread itself may not be a huge problem, In each thread DB connection is being made, hence we end up breaking the max concurrent session limit by our DB client.

public class TestForkJoin {

    public static void main(String[] args) {
        ClassA a = new ClassA();
        System.out.println(a.fetchAll(Collections.singleton("ok")));
    }

    public abstract static class Base {
        public Map<String, String> fetchAll(Set<String> ids) {
            return ids.stream().parallel().map(i -> ImmutablePair.of(i, fetch(i))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        }

        public abstract String fetch(String id);
    }

    public static class ClassA extends Base {

        @Override
        public String fetch(String id) {
            ClassB b = new ClassB();
            Map<String, String> result = b.fetchAll(Collections.singleton(id));
            return String.join(":", result.values());
        }
    }

    public static class ClassB extends Base {

        @Override
        public String fetch(String id) {
            return id;
        }
    }
}

In the above example we are recursively calling .parallel(). For example

A.fetchAll() -> .parallel()  -> A.fetch() -> B.fetchAll() -> .parallel(). 

To Deep dive on this. .parallel() uses forkJoinPool.commonpool() whose parallelism is controlled by number of cores on the machine or java.util.concurrent.ForkJoinPool.common.parallelism.

In java 8, i was seeing that only 8 thread were concurrently executing and the pool size was also 8. but In java17, event though parallelism is 8, the pool size goes upto 150. Below are some of the option i evaluated

  1. Not using fork join pool but use executor service with fixed core pool size

    But this option does not work for us, because if more number of concurrent request comes on classA.fetchAll, then the threads would get in deadlock state. In fact this is one the of the primary advantage of using fork join pool over executor service. see here

  2. Instead of using forkjoin common pool, use custom fork join pool with limit on pool size Whenever the pool size limit is reached exception is thrown. So this practically does not solve anything.

    java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    

Basically I want, infinite size queue, but number of threads in pool should not increase.

This openjdk issues seems to be related to this.


Solution

  • I was not able to reproduce the issue on local, because i was trying to reproduce using Thread.sleep() but Thread.sleep is a graceful way to tell that thread wants to sleep. Rather than doing thread.sleep I started making http call.

    As @DuncG suggested in comment using maximumPoolSize, along with saturation predicate fixed the issue. Set maximumPoolSize equals to number of connections we can afford. And always return saturated = true in the predicate.

    import org.apache.commons.lang3.tuple.ImmutablePair;
    import org.apache.commons.lang3.tuple.Pair;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.http.HttpClient;
    import java.net.http.HttpRequest;
    import java.net.http.HttpResponse;
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class Test2 {
    
        private static final ForkJoinPool forkJoinPool = new ForkJoinPool(4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false, 0, 10, 1, pool -> {
            boolean allow = true;
            System.out.println(Thread.currentThread().getName() + " saturate:" + allow);
            return allow;
        }, 3L, TimeUnit.SECONDS);
        private static final ExecutorService executorService = Executors.newFixedThreadPool(1);
    
        public static void main(String[] args) {
            logPoolStats();
            ClassA a = new ClassA();
            System.out.println(a.fetchAll(IntStream.range(1, 10).mapToObj(String::valueOf).collect(Collectors.toList())));
        }
    
    
        public abstract static class Base {
            public Map<String, String> fetchAll(List<String> ids) {
                List<Callable<ImmutablePair<String, String>>> todo = new ArrayList<>();
                ids.forEach(id -> todo.add(() -> ImmutablePair.of(id, fetch(id))));
    
                List<Future<ImmutablePair<String, String>>> futures = forkJoinPool.invokeAll(todo);
    
                return futures.stream().map(i -> {
                    try {
                        return i.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
            }
    
            public abstract String fetch(String id) throws IOException, InterruptedException;
        }
    
        public static class ClassA extends Base {
    
            @Override
            public String fetch(String id) {
                ClassB b = new ClassB();
                Map<String, String> result = b.fetchAll(IntStream.range(1, 10).mapToObj(String::valueOf).collect(Collectors.toList()));
                return String.join(":", result.values());
            }
        }
    
        public static class ClassB extends Base {
            @Override
            public String fetch(String id) throws IOException, InterruptedException {
                try {
                    HttpClient.newBuilder().build().send(HttpRequest.newBuilder(URI.create("https://stackoverflow.com/" + id)).build(), HttpResponse.BodyHandlers.ofString()).body();
                } catch (Exception ignored) {}
                return "ok";
            }
        }
    
    
        private static void logPoolStats() {
            executorService.submit(() -> {
                while (true) {
                    System.out.println("active.threads " + forkJoinPool.getActiveThreadCount());
                    System.out.println("running.threads " + forkJoinPool.getRunningThreadCount());
                    System.out.println("pool.size " + forkJoinPool.getPoolSize());
                    System.out.println("queue.submission " + forkJoinPool.getQueuedSubmissionCount());
                    System.out.println("parallelism " + forkJoinPool.getParallelism());
                    System.out.println("queue.task " + forkJoinPool.getQueuedTaskCount());
                    Thread.sleep(1000);
                }
            });
        }
    }