Search code examples
javamultithreadingjava-streamthreadpool

How to create thread pool for stream operation


I like to control the thread execution when using streams with a thread pool. Currently I have List of string

List<String> mylist = new ArrayList() {"1","2","3","4"}; //that holds the strings 
List<Action> actions = new ArrayList<>{} // holds function that manipulate the strings from mylist 

Each action have work method that get the String from mylist.

Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s-> actions.stream().map(ac -> ac.work(str)));
r = stream.collect(Collectors.toList());

All work great but I have no control on the thread pool, know I can use ForkJoinPool as in this example:

Example

But I didn't found way to implement it in my example. This for example doesn't work:

   ForkJoinPool customThreadPool = new ForkJoinPool(4);
            r= customThreadPool.submit(
                    () -> mylist.parallelStream().flatMap(s-> actions.stream().map(ac -> ac.work(str))).collect(Collectors.toList()));

gives me error:

java: incompatible types: no instance(s) of type variable(s) T,R,A,capture#1 of ?,T exist so that java.util.concurrent.ForkJoinTask<T> conforms to java.util.List<java.lang.String>

Solution

  • The code compiles and runs fine, once the code errors are fixed (str => s).

    Common Pool

    // Setup with dummy actions for testing which thread executes the action
    List<String> mylist = new ArrayList<>(Arrays.asList("1","2","3","4")); //that holds the strings 
    List<Action> actions = new ArrayList<>(Arrays.asList(
            s -> { s += "x";  System.out.println(Thread.currentThread().getName() + ": " + s); return s; },
            s -> { s += "y";  System.out.println(Thread.currentThread().getName() + ": " + s); return s; }
            ));
    
    // Using common pool
    Stream<String> stream = mylist.parallelStream();
    stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
    List<String> r = stream.collect(Collectors.toList());
    System.out.println(r);
    

    Output

    ForkJoinPool.commonPool-worker-7: 1x
    ForkJoinPool.commonPool-worker-3: 2x
    ForkJoinPool.commonPool-worker-3: 2y
    main: 3x
    ForkJoinPool.commonPool-worker-5: 4x
    main: 3y
    ForkJoinPool.commonPool-worker-7: 1y
    ForkJoinPool.commonPool-worker-5: 4y
    [1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
    

    Custom Pool

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    ForkJoinTask<List<String>> task = customThreadPool.submit(
            () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
    System.out.println(task.get());
    

    If the compiler complains as described in the question, you need to help it choose the correct overload of submit() by casting the lambda expression in the 3rd line:

            (Callable<List<String>>) () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
    

    Output

    ForkJoinPool-1-worker-3: 3x
    ForkJoinPool-1-worker-1: 1x
    ForkJoinPool-1-worker-1: 1y
    ForkJoinPool-1-worker-5: 2x
    ForkJoinPool-1-worker-7: 4x
    ForkJoinPool-1-worker-7: 4y
    ForkJoinPool-1-worker-5: 2y
    ForkJoinPool-1-worker-3: 3y
    [1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]
    

    Single Thread

    Stream<String> stream = mylist.stream();
    stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
    List<String> r = stream.collect(Collectors.toList());
    System.out.println(r);
    

    Output

    main: 1x
    main: 1y
    main: 2x
    main: 2y
    main: 3x
    main: 3y
    main: 4x
    main: 4y
    [1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]