Search code examples
javamultithreadinglivelock

Circular Buffer with Threads Consumer and Producer: it get stucks some executions


I'm developing a circular buffer with two Threads: Consumer and Producer. I'm using active waiting with Thread.yield. I know that it is possible to do that with semaphores, but I wanted the buffer without semaphores.

Both have a shared variable: bufferCircular.

While the buffer is not full of useful information, producer write data in the position pof array, and while there are some useful information consumer read data in the position c of array. The variable nElem from BufferCircular is the number of value datas that haven't been read yet.

The program works quite good 9/10 times that runs. Then, sometimes, it get stucks in a infinite loop before show the last element on screen (number 500 of loop for), or just dont' show any element.

I think is probably a liveLock, but I can't find the mistake.

Shared Variable:

public class BufferCircular {
    volatile int[] array;
    volatile int p;
    volatile int c;
    volatile int nElem;

    public BufferCircular(int[] array) {
       this.array = array;
       this.p = 0;
       this.c = 0;
       this.nElem = 0;
    }

    public void writeData (int data) {
       this.array[p] = data;
       this.p = (p + 1) % array.length;
       this.nElem++;
    }

    public int readData() {
       int data = array[c];
       this.c = (c + 1) % array.length;
       this.nElem--;
       return data;
    }

}

Producer Thread:

public class Producer extends Thread {
    BufferCircular buffer;
    int bufferTam;
    int contData;

    public Productor(BufferCircular buff) {
       this.buffer = buff;
       this.bufferTam = buffer.array.length;
       this.contData = 0;

    }

    public void produceData() {
       this.contData++;
       this.buffer.writeData(contData);
    }

    public void run() {
       for (int i = 0; i < 500; i++) {
          while (this.buffer.nElem == this.bufferTam) {
          Thread.yield();
        }
          this.produceData();
       }
    }
}

Consumer Thread:

    public class Consumer extends Thread {
        BufferCircular buffer;
        int cont;

        public Consumer(BufferCircular buff) {
           this.buffer = buff;
           this.cont = 0;
        }

        public void consumeData() {
           int data = buffer.readData();
           cont++;
           System.out.println("data  " + cont + ": " + data);
        }

        public void run() {
           for (int i = 0; i < 500; i++) {
              while (this.buffer.nElem == 0) {
                 Thread.yield();
              }
               this.consumeData();
           }
        }
   }

Main:

public class Main {

    public static void main(String[] args) {
       Random ran = new Random();
       int tamArray = ran.nextInt(21) + 1;
       int[] array = new int[tamArray];

       BufferCircular buffer = new BufferCircular(array);

       Producer producer = new Producer (buffer);
       Consumer consumer = new Consumer (buffer);

       producer.start();
       consumer.start();

       try {
           producer.join();
           consumer.join();
       } catch (InterruptedException e) {
           System.err.println("Error with Threads");
           e.printStackTrace();
    }

    }

}

Any help will be welcome.


Solution

  • Your problem here is that your BufferCircular methods are sensitive to race conditions. Take for example writeData(). It executes in 3 steps, some of which are also not atomic:

    this.array[p] = data;             // 1
    this.p = (p + 1) % array.length;  // 2  not atomic
    this.nElem++;                     // 3  not atomic
    

    Suppose that 2 threads entered writeData() at the same time. At step 1, they both have the same p value, and both rewrite array[p] value. Now, array[p] is rewritten twice and data that first thread had to write, is lost, because second thread wrote to the same index after. Then they execute step 2--and result is unpredictable since p can be incremented by 1 or 2 (p = (p + 1) % array.length consists of 3 operations, where threads can interact). Then, step 3. ++ operator is also not atomic: it uses 2 operations behind the scenes. So nElem becomes also incremented by 1 or 2.

    So we have fully unpredictable result. Which leads to poor execution of your program.

    The simplest solution is to make readData() and writeData() methods serialized. For this, declare them synchronized:

    public synchronized void writeData (int data) { //...
    public synchronized void readData () { //...
    

    If you have only one producer and one consumer threads, race conditions may occur on operations involving nElem. Solution is to use AtomicInteger instead of int:

    final AtomicInteger nElem = new AtomicInteger();
    

    and use its incrementAndGet() and decrementAndGet() methods.