Search code examples
javajava-8bufferedimagejava.util.concurrentforkjoinpool

ForkJoinPool BufferedImage Processing Style


I am trying to process an image using a ForkJoinPool in java. I used streams to do some custom operations on the image. I am trying to use ForkJoinPool for getRGB and setRGB methods. How do I achieve parallelism on getRGB methods?

    @Override
    public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) {

        int[][] sol = new int[h][w];

        int threshold = w;

        class RecursiveSetter extends RecursiveAction {
            int from;
            int to;
            FJBufferedImage image;

            RecursiveSetter(int from, int to, FJBufferedImage image) {
                this.from = from;
                this.to = to;
                this.image = image;
            }

            @Override
            protected void compute() {
                System.out.println("From : " + from + " To : " + to);
                if (from >= to) return;

                if (to - from == 1) {
                    computeDirectly(from);
                    return;
                } else {
                    int mid = from + (to - from) / 2;
                    System.out.println("From : " + from + " To : " + to +
                            "Mid :" + mid);
                    invokeAll(
                            new RecursiveSetter(from, mid, image),
                            new RecursiveSetter(mid + 1, to, image));
                    return;
                }
            }

            void computeDirectly(int row) {
                sol[from] = image.getRealRGB(from, 0, w, 1, null, offset,
                        scansize);

            }
        }

        ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        pool.invoke(new RecursiveSetter(0, h-1, this));
        return Arrays.stream(sol)
                .flatMapToInt(Arrays::stream)
                .toArray();
    }

The getRealRGB just proxies to the method of BufferedImage. I understand this may be impractical but I just want to know how can I use ForkJoinPool in this context. And yeah, the above code is throwing ArrayIndexOutOfBound Exception. Please give suggestion about how to split the work load (row vs col vs small grid. Right now, i am doing this split by row) and how to decide the threshold.


Solution

  • First some remarks regarding your attempt:

    int[][] sol = new int[h][w];
    

    here you are creating a two dimensional array which in Java is a one-dimensional array with element type int[] that is already populated with sub array of the int[] type. Since you’re going to overwrite the elements with the sol[from] = /* something returning an int[] array */, allocating these sub-arrays is obsolete. So in this case, you should use

    int[][] sol = new int[h][];
    

    instead. But recognizing the one-dimensional nature of the outer array allows also recognizing that a simple streaming solution will do the job, i.e.

    int[][] sol = IntStream.range(yStart, yStart+h)
        .parallel()
        .mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize))
        .toArray(int[][]::new);
    

    This already does the job of distributing the workload on the available cores. It uses the Fork/Join framework behind the scenes, just like you tried to do, but that’s an implementation detail. You could fuse that with the next stream operation, e.g.

    return IntStream.range(yStart, yStart+h)
        .parallel()
        .flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize)))
        .toArray();
    

    though, if I understand the method signature correctly, you actually want to do

    public int[] getRGB(
           int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) {
    
        final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize];
        IntStream.range(yStart, yStart+h).parallel()
            .forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize));
        return result;
    }
    

    to fulfill the contract. This also minimizes the number of copying operations. Since each query writes into a different region of the array, directly writing into the target array is thread safe.

    This keeps the strategy of splitting ranges of rows only. Sub-splitting of rows is possible, but more complicated while rarely paying off. It would only help in the corner case of a caller requesting a very little number of rows but lots of values per row. But even then, it’s not clear whether the complicated sub-row splitting would pay off, due to the memory locality issues.


    Regarding your original question, if you implement a ForkJoinTask directly, you may use getSurplusQueuedTaskCount() to decide whether to split again or compute directly.

    The choice of threshold is a trade-off between overhead due to number of task objects that have to synchronize and core utilization. If the workload can be split perfectly balanced and no other unrelated thread or process uses CPU time, having a single item per core would be perfect. But in practice, the tasks never run exactly the same time so having some spare split task to be executed by those cores that finished first is desirable. A typical threshold lies between 1 or 3 (remember that’s the number of queued tasks per core), for your kind of tasks, having very even workloads, a smaller number could be used, e.g. stop splitting once there is another queue item.