Search code examples
javaparallel-processingjava-8java-streamshort-circuiting

Why does a parallel Java Stream with a short-curcuit operation evaluate all elements of the Stream while a sequential Stream does not?


Consider the two test methods parallel() and sequential():

  @Test
  public void parallel() throws Exception
  {
    System.out.println( "parallel start." );
    IntStream.of( 0, 1 ).parallel().map( this::work ).findAny();
    System.out.println( "parallel done." );
  }

  @Test
  public void sequential() throws Exception
  {
    System.out.println( "sequential start." );
    IntStream.of( 0, 1 ).map( this::work ).findAny();
    System.out.println( "sequential done." );
  }

  private int work(int i)
  {
    System.out.println( "working... " + i );
    Threads.sleepSafe( i * 1000 );
    System.out.println( "worked. " + i );
    return i;
  }

Threads.sleepSafe() is a simple wrapper around Thread.sleep(), which swallows the exception and does nothing if 0 is passed.

When the test methods are run, the result is this:

sequential start.
working... 0
worked. 0
sequential done.

parallel start.
working... 1
working... 0
worked. 0
sleeping for 1000 ms ...
slept for 1000 ms.
worked. 1
parallel done.

sequential() operates as I would expect, but parallel() does not: I would expect findAny() in parallel() to return as soon as work() returns for the first time (i.e. for value 0, because it does not sleep), but instead it only returns after work() is also completed for value 1.

Why?

Is there a way to make findAny() return as soon as work() returns for the first time?


Solution

  • A parallel stream still supports short-circuiting, but there was be no advantage in using a parallel stream, if all threads were deferring their work until the thread(s) processing previous elements acknowledged that the operation has not ended.

    Therefore, it is expected behavior that a parallel stream processes an unspecified number of elements more than necessary, as long as the final result is assembled correctly, i.e. dropping excess elements.

    It’s just your example, consisting of only two elements, were just processing one element more than necessary can be interpreted as “all elements are processed”.

    There’s generally little benefit in parallel processing when the number of elements is small and/or the actual operation is to find something that will predictably be among the first elements of the stream. Things get more interesting, if you do something like

    IntStream.range(0, 2000).parallel()
        .map(i -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); return i;})
        .filter(i->i%397==396)
        .findAny();
    

    Note that the terminal operation will wait for the completion of all worker threads before returning the final result, so when the evaluation of an element has been already started by the time a result has been found, that element’s processing will be completed. This is by design. It ensures that there will be no concurrent access to a source collection or other data accessed by your lambda expressions when your application code proceeds after the stream operation.

    Compare with the package documentation:

    In almost all cases, terminal operations are eager, completing their traversal of the data source and processing of the pipeline before returning. Only the terminal operations iterator() and spliterator() are not; …

    So a short-circuiting parallel stream does not process all elements, but may still take longer to return the already evaluated result when the other worker threads are still processing obsolete elements.

    If you want an early return, accepting potentially still running background threads, the Stream API is not the right thing for you. Consider

    private int work(int i) throws InterruptedException {
        System.out.println( "working... " + i );
        Thread.sleep(i * 1000);
        System.out.println( "worked. " + i );
        return i;
    }
    public void parallel() throws Exception {
        System.out.println( "parallel start." );
        List<Callable<Integer>> jobs = IntStream.range(0, 100)
          .collect(ArrayList::new, (l,i) -> l.add(() -> work(i)), List::addAll);
        ExecutorService pool = Executors.newFixedThreadPool(10);
        Integer result = pool.invokeAny(jobs);
        pool.shutdown();
        System.out.println( "parallel done, result="+result );
    }
    

    Note that this not only returns immediately after the first job completed, it also supports cancellation of all already running jobs via interruption.