Search code examples
javamultithreadingjava-threads

How to build a very simple multithreaded pipeline in Java?


I am trying to run some filters on a matrix within a multithreaded pipeline to save some time. Sadly, there are appearent synchronization issues which I fail to fully grasp yet.

What I'm expecting is that I can input a matrix to my step() function and receive a matrix that has all filters on it (delayed by the pipeline lenght).

    volatile private short[][] stage1matrix, stage2matrix;

    public short[][] step(short[][] matrix) {

        short[][] res = stage2matrix; // stage matrix with all applied filters for output
        stage2matrix = stage1matrix; // take matrix with applied stage 1 filters for stage 2
        stage1matrix = matrix; // stage input matrix for stage 1 filters

        Thread stage2 = new Thread(() -> {
            applyArtifactFilter(stage2matrix);
        });
        stage2.setPriority(10);
        stage2.start();

        Thread stage1 = new Thread(() -> {
            applySaltFilter(stage1matrix);
            applyLineFilter(stage1matrix);
            applyMedFilter(stage1matrix);
            applyPreArtifactFilter(stage1matrix);
        });
        stage1.start();

        try {
            stage1.join();
            stage2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return res; // output matrix with all applied filters
    }
}

Also, for some additional (and maybe unnecessary) information, I'm using paralellism in the applyMedFilter in a maybe abusive way (I don't really think that this causes the problem but...):

        ArrayList<Integer> rowIndices = new ArrayList<Integer>();
        for (int i = kernel; i < matrix.length + kernel; i++) {
            rowIndices.add(i);
        }
        //   Move window through all elements of the image
        rowIndices.parallelStream().forEach(i -> {
            for (int j = kernel; j < matrix[0].length + kernel; j++) {
                // do some rowwise independent work in parallel
                   doSomeThingWith(matrix[i][j]);
            }
        });
    }

What is happening instead that I irregularly get a matrix from the function that has only some of the filters applied; especially the applyMedFilter was sometimes not applied which shouldn't be possible to happen. The fact that sometimes, everything works right and sometimes not leads me to the conclusion that this might be a cache coherency problem. Also, commenting out the applyArtifactFilter seems to resolve the problem.

Why is this not working as I am expecting and how could I do better?


Solution

  • The posted code can work correctly; the mistake was the input for the function which was the same matrix every time instead of a copy, so all variables refered to the same.