Search code examples
javamultithreadingparallel-processingbubble-sort

Odd-Even sort Java using multithreading


I am new to this group, so I believe it is a possibility to get help here since I could not find any information about my question on Google.

I am trying to implement Java EvenOdd transposition sort parallel. Therefore, as I algorithm I thought that dividing into partitions would be a great idea:

  1. How to know how many parts should I divide my array list? For example, I use 17 elements for now to make it more understandable.
  2. Also, I do not know if I should use something called ExecutorService or just create threads normally.

I add my current logic here: Start from the Odd phase and divide into two parts and assign two threads to make these comparisons, after that create a Barrier to wait for threads and therefore start other threads to work with the even indexes similarly. Thanks for any help that you could give me. Currently, I do not know how to implement this algorithm, so any words might help.enter image description here


Solution

  • How to know how many parts should I divide my array list? For example, I use 17 elements for now to make it more understandable.

    Your intuition to divide the array into subarrays is correct, as it is often the basis for concurrent sorting algorithms. As you know the algorithm already, we only have to discuss the implementation :

    1. The intuitive solution would be to create a thread for every odd index, start() all of them for the compare-and-swap and join() them to the main thread to wait on the result. Rince and repeat this N times. This is very inefficient, however, as the overhead of creating and starting all of the O(N^2) threads is far to big for the fast compare-and-swap.
    2. We can also create threads for every odd index, and make them repeatedly compare-and-swap between left and right. This is problematic as we would have to lock left and right repeatedly (to prevent data races), would lead to a lot of useless overhead with the scheduler and we wouldn't know when we are finished.
    3. Lastly, we can create threads for every odd index, also make them repeatedly alternate between left and right and, every time, make them wait on a barrier. This seems for me to be the correct option as it minimizes thread management overhead and also limits useless comparisons and data races.

    This leads us to the following code :

    import java.util.Arrays;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class OddEvenSort {
        public static void main(String[] args) {
            int[] arr = {83, 71, 72, 26,  6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
            sortArr(arr);
            System.out.println(Arrays.toString(arr));
        }
        
        public static void sortArr(int[] arr) {
            int threadNum = arr.length/2;
            CyclicBarrier barr = new CyclicBarrier(threadNum);
            Thread[] threads = new Thread[threadNum];
            for (int i = 0; i < threadNum; i++) {
                threads[i] = new Thread(new CompareSwapThread(arr, 2*i + 1, barr));
                threads[i].start();
            }
            for (int i = 0; i < threadNum; i++) {
                try {
                    threads[i].join();
                } catch (InterruptedException e) {e.printStackTrace();}
            }
        }
    }
    
    class CompareSwapThread implements Runnable {
        private int[] arr;
        private int index;
        private CyclicBarrier barr;
        
        public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
            this.arr = arr;
            this.index = index;
            this.barr = barr;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < arr.length; i++) {
                if (arr[index - 1] > arr[index]) {
                    int t = arr[index - 1];
                    arr[index - 1] = arr[index];
                    arr[index] = t;
                }
                try {
                    barr.await();
                } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
                if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
                    int t = arr[index];
                    arr[index] = arr[index + 1];
                    arr[index + 1] = t;
                }
                try {
                    barr.await();
                } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
            }
        }   
    }
    

    Notice that this algorithm has a runtime of O(n) which is not the best for such a parallel algorithm. Another algorithm you can try to implement in parrallel is the MergeSort algorithm. There are a lot of things you can parallelize with this one, but the most important one is the merging, as it is the bottleneck in the sequential algorithm. You can look at Batcher Odd-Even Mergesort or look at other parallel merges.

    Also, I do not know if I should use something called ExecutorService or just create threads normally.

    Java provides a lot of different tools for parallelism, which operate at different levels of abstraction. One could say that ExecutorService is more 'high-level' than basic threads, as it simplifies the thread managment. It also will optimize scheduling of tasks, as to make execution better.

    Here is our implementation, using ExecutorService :

    import java.util.Arrays;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class OddEvenSort {
        public static void main(String[] args) {
            int[] arr = {83, 71, 72, 26,  6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
            sortArr(arr);
            System.out.println(Arrays.toString(arr));
        }
        
        public static void sortArr(int[] arr) {
            int threadNum = arr.length/2;
            CyclicBarrier barr = new CyclicBarrier(threadNum);
            ExecutorService exec = Executors.newFixedThreadPool(threadNum);
            Future<?>[] awaits = new Future<?>[threadNum];
            for (int i = 0; i < threadNum; i++) {
                awaits[i] = exec.submit(new CompareSwapThread(arr, 2*i + 1, barr));
            }
            for (int i = 0; i < threadNum; i++) {
                try {
                    awaits[i].get();
                } catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
            }
        }
    }
    
    class CompareSwapThread implements Runnable {
        private int[] arr;
        private int index;
        private CyclicBarrier barr;
        
        public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
            this.arr = arr;
            this.index = index;
            this.barr = barr;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < arr.length; i++) {
                if (arr[index - 1] > arr[index]) {
                    int t = arr[index - 1];
                    arr[index - 1] = arr[index];
                    arr[index] = t;
                }
                try {
                    barr.await();
                } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
                if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
                    int t = arr[index];
                    arr[index] = arr[index + 1];
                    arr[index + 1] = t;
                }
                try {
                    barr.await();
                } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
            }
        }   
    }
    

    As you can see, we are using the thread factory newFixedThreadPool static method to generate and intantiate all the treads. We then add our tasks to the thread pool, which will return a Future variable. A Future will hold the value, when the thread finished (in our case null). Calling the Future.get() method will wait for the result (and thus the thread to be finished). Notice that is you want to implement some sort of nested thread parralelism (for example, when parallelizing MergeSort). You should use ForkJoinPool as it is made specifically for that. Finally, here is a good tutorial about ExecutorService.