Search code examples
javajava-8java-streamfuturespliterator

Java Spliterator : How to process large Stream splits equally?


The code I'm working with

package com.skimmer;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;

public class App {

  public static void main(String[] args) throws InterruptedException, ExecutionException {

    // Simply creating some 'test' data
    Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");

    Spliterator<String> spliterator = test.parallel().spliterator();
    List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();

    // Creating a future for each split to process concurrently
    int totalSplits = 0;
    while ((spliterator = spliterator.trySplit()) != null) {

      totalSplits++;
      callableList.add(new Worker(spliterator, "future-" + totalSplits));
    }

    ExecutorService executor = Executors.newFixedThreadPool(totalSplits);
    List<Future<Long>> futures = executor.invokeAll(callableList);
    AtomicLong counter = new AtomicLong(0);

    for (Future<Long> future : futures)
      counter.getAndAdd(future.get());

    System.out.println("Total processed " + counter.get());
    System.out.println("Total splits " + totalSplits);

    executor.shutdown();
  }

  public static class Worker implements Callable<Long> {

    private Spliterator<String> spliterator;
    private String name;

    public Worker(Spliterator<String> spliterator, String name) {
      this.spliterator = spliterator;
      this.name = name;
    }

    @Override
    public Long call() {

      AtomicLong counter = new AtomicLong(0);
      spliterator.forEachRemaining(s -> {

        // We'll assume busy processing code here
        counter.getAndIncrement();

      });

      System.out.println(name + " Total processed : " + counter.get());

      return counter.get();
    }
  }
}

The output

furture-11 Total processed : 244
furture-10 Total processed : 488
furture-9 Total processed : 977
furture-12 Total processed : 122
furture-7 Total processed : 3906
furture-13 Total processed : 61
furture-8 Total processed : 1953
furture-6 Total processed : 7813
furture-14 Total processed : 31
furture-5 Total processed : 15625
furture-15 Total processed : 15
furture-4 Total processed : 31250
furture-17 Total processed : 4
furture-18 Total processed : 2
furture-19 Total processed : 1
furture-16 Total processed : 8
furture-3 Total processed : 62500
furture-2 Total processed : 125000
furture-1 Total processed : 250000
future-0 Total processed : 500000
Total processed 1000000
Total splits 20

My problem/Question : The first trySplit (and future task 'future-0') gets exactly n/2 total elements to begin processing. The first couple splits take a long time to complete - this gets worse as n grows. Is there any other way to process a stream where each future/callable gets an equal distribution of elements to process such as (N/splits) ie. 1000000/20 = 50000

Desired results

furture-11 Total processed : 50000
furture-10 Total processed : 50000
furture-9 Total processed : 50000
furture-12 Total processed : 50000
furture-7 Total processed : 50000
furture-13 Total processed : 50000
furture-8 Total processed : 50000
furture-6 Total processed : 50000
furture-14 Total processed : 50000
furture-5 Total processed : 50000
furture-15 Total processed : 50000
furture-4 Total processed : 50000
furture-17 Total processed : 50000
furture-18 Total processed : 50000
furture-19 Total processed : 50000
furture-16 Total processed : 50000
furture-3 Total processed : 50000
furture-2 Total processed : 50000
furture-1 Total processed : 50000
future-0 Total processed : 50000
Total processed 1000000
Total splits 20

Follow up question : If Spliterator is unable to do this what other approach/solution would be best used to process large streams concurrently.

Practical case scenario : Processing a large (6GB) CSV file that is too large to hold in memory


Solution

  • You are getting perfectly balanced splits here. The problem is, each time you split a sequence of elements into two halves, represented by two Spliterator instances, you create a job for one of the halves, not even attempting to split it further, but only subdividing the other halve.

    So right after the first split, you create a job covering 500,000 elements. Then, you call trySplit on the other 500,000 elements, getting a perfect split into two chunks of 250,000 elements, create another job covering one chunk of 250,000 elements and only try to subdivide the other. And so on. It's your code creating unbalanced jobs.

    When you change your first part to

    // Simply creating some 'test' data
    Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
    // Creating a future for each split to process concurrently
    List<Callable<Long>> callableList = new ArrayList<>();
    int workChunkTarget = 5000;
    Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
    spliterators.add(test.parallel().spliterator());
    int totalSplits = 0;
    while(!spliterators.isEmpty()) {
        Spliterator<String> spliterator = spliterators.pop();
        Spliterator<String> prefix;
        while(spliterator.estimateSize() > workChunkTarget
                  && (prefix = spliterator.trySplit()) != null) {
            spliterators.push(spliterator);
            spliterator = prefix;
        }
        totalSplits++;
        callableList.add(new Worker(spliterator, "future-" + totalSplits));
    }
    

    you get quiet close to your desired target workload size (as close as we can, given that the numbers are not powers of two).

    The Spliterator design works much smoother with tools like ForkJoinTask, where a new job can be submitted after each successful trySplit and the job itself will decide to split and spawn new jobs concurrently when the worker threads are not saturated (like parallel stream operations are done in the reference implementation).