Search code examples
javamultithreadingproducer-consumerblockingqueue

How to avoid starvation in multi producer and consumer?


  1. Consider here 2 producer threads and one consumer thread.
  2. Suppose queue is full.
  3. Two producer thread goes to wait state because queue is full.
  4. Consumer thread takes element from queue and notifyAll so one of the producer thread adds element and comes out and another producer thread remains in wait state and again another producer threads add elements and coming out.
  5. So if you observe there are chances that one thread can be in wait state always.

How to avoid this ?

import java.util.LinkedList;
import java.util.List;

interface BlockingQueueCustom<E> {

      void put(E item)  throws InterruptedException ;

      E take()  throws InterruptedException;
}

class LinkedBlockingQueueCustom<E> implements BlockingQueueCustom<E> {

    private List<E> queue;
    private int maxSize; // maximum number of elements queue can hold at a time.

    public LinkedBlockingQueueCustom(int maxSize) {
        this.maxSize = maxSize;
        queue = new LinkedList<E>();
    }

    public synchronized void put(E item) throws InterruptedException {

         while(queue.size() == maxSize) {
            this.wait();
        }

        queue.add(item);
        this.notifyAll();
    }

    public synchronized E take() throws InterruptedException {

        while(queue.size() == 0) {
            this.wait();
        }

        this.notifyAll();
        return queue.remove(0);

    }

}

public class BlockingQueueCustomTest {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueueCustom<Integer> b = new LinkedBlockingQueueCustom<Integer>(10);
        System.out.println("put(11)");
        b.put(11);
        System.out.println("put(12)");
        b.put(12);
        System.out.println("take() > " + b.take());
        System.out.println("take() > " + b.take());

    }
}

Solution

  • There are several options. One is to use standard ArrayBlockingQueue calling two arguments constructor. In this case queue will process threads in FIFO order and producers will add elements in the queue in the same order they've tried to put elements into the queue.

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {...}
    

    Another option is to use ReentrantReadWriteLock with fairness parameter turned on instead of synchronized block.

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {...}
    

    This is an example of how to convert synchronized block into code with ReentrantReadWriteLock: https://javarevisited.blogspot.com/2015/06/java-lock-and-condition-example-producer-consumer.html