Search code examples
javamultithreadingjava-8fork-joinforkjoinpool

ForkJoinPool - Why program is throwing OutOfMemoryError?


I wanted to try out ForkJoinPool in Java 8 so i wrote a small program for searching all the files whose name contains a specific keyword in a given directory.

Program:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
                if(mainDirectory.canRead()) {
                    File[] fileList = mainDirectory.listFiles();
                    for(File file : fileList) {
                        System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                        if(file.isDirectory()) {
                            FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
                            recursiveTasks.add(task);
                            task.fork();
                        } else {
                            if (file.getName().contains("hello")) {
                                System.out.println(file.getName());
                                filetedFileList.add(file.getName());
                            }
                        }
                    }
                }

                for(FileSearchRecursiveTask task : recursiveTasks) {
                  filetedFileList.addAll(task.join());
                }

        }
        return filetedFileList;

    }
}

This program works fine when directory doesn't have too many sub-directories and files but if its really big then it throws OutOfMemoryError.

My understanding is that max number of threads (including compensation threads) are bounded so why their is this error? Am i missing anything in my program?

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)   

Solution

  • You should not fork new tasks beyond all recognition. Basically, you should fork as long as there’s a chance that another worker thread can pick up the forked job and evaluate locally otherwise. Then, once you have forked a task, don’t call join() right afterwards. While the underlying framework will start compensation threads to ensure that your jobs will proceed instead of just having all threads blocked waiting for a sub-task, this will create that large amount of threads that may exceed the system’s capabilities.

    Here is a revised version of your code:

    public class DirectoryService {
    
        public static void main(String[] args) {
            FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
            List<String> files = task.invoke();
            System.out.println("Total no of files with hello " + files.size());
        }
    
    }
    
    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private static final int TARGET_SURPLUS = 3;
        private File path;
        public FileSearchRecursiveTask(File file) {
            this.path = file;
        }
    
        @Override
        protected List<String> compute() {
            File directory = path;
            if(directory.isDirectory() && directory.canRead()) {
                System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
                return scan(directory);
            }
            return Collections.emptyList();
        }
    
        private List<String> scan(File directory)
        {
            File[] fileList = directory.listFiles();
            if(fileList == null || fileList.length == 0) return Collections.emptyList();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            List<String> filteredFileList = new ArrayList<>();
            for(File file: fileList) {
                System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                if(file.isDirectory())
                {
                    if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
                    {
                        FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
                        recursiveTasks.add(task);
                        task.fork();
                    }
                    else filteredFileList.addAll(scan(file));
                }
                else if(file.getName().contains("hello")) {
                    filteredFileList.add(file.getAbsolutePath());
                }
            }
    
            for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
                FileSearchRecursiveTask task = recursiveTasks.get(ix);
                if(task.tryUnfork()) task.complete(scan(task.path));
            }
    
            for(FileSearchRecursiveTask task: recursiveTasks) {
                filteredFileList.addAll(task.join());
            }
            return filteredFileList;
        }
    }
    

    The method doing the processing has been factored out into a method receiving the directory as parameter, so we are able to use it locally for arbitrary directories not necessarily being associated with a FileSearchRecursiveTask instance.

    Then, the method uses getSurplusQueuedTaskCount() to determine the number of locally enqueued tasks which have not been picked up by other worker threads. Ensuring that there are some helps work balancing. But if this number exceeds the threshold, the processing will be done locally without forking more jobs.

    After the local processing, it iterates over the tasks and uses tryUnfork() to identify jobs which have not been stolen by other worker threads and process them locally. Iterating backwards to start this with the youngest jobs raises the chances to find some.

    Only afterwards, it join()s with all sub-jobs which are now either, completed or currently processed by another worker thread.

    Note that I changed the initiating code to use the default pool. This uses “number of CPU cores” minus one worker threads, plus the initiating thread, i.e. the main thread in this example.