Search code examples
javamultithreadingproducer-consumer

Producer-Consumer with semaphores gets deadlocked


I'm trying to create Producer-Consumer with semaphores only. With following code

public class Application {

    public static int id = 0;

    public static void main(String[] args) {

        Semaphore producerSem = new Semaphore(1);
        Semaphore consumerSem = new Semaphore(1);
        Queue<Integer> line = new LinkedList<>();

        Runnable produce = () -> {
            try {
                producerSem.acquire();
                System.out.println(Thread.currentThread().getName() + " producing");
                while (line.size() > 10) continue;
                Thread.sleep(2000);
                line.offer(id);
                System.out.println(Thread.currentThread().getName() + " produced thing with id: " + id);
                id++;
                System.out.println(Thread.currentThread().getName() + " finished producing");
                producerSem.release();

            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        Runnable consume = () -> {
            try {
                consumerSem.acquire();
                System.out.println(Thread.currentThread().getName() + " consuming");
                while (line.size() < 1) continue;
                int product = line.remove();
                System.out.println(Thread.currentThread().getName() + " consumed thing with id: " + product);
                Thread.sleep(3000);
                System.out.println(Thread.currentThread().getName() + " finished consuming");
                consumerSem.release();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 100; i++) {
            new Thread(consume, "Consumer - " + i).start();
            new Thread(produce, "Producer - " + i).start();
        }
    }
}

it gets deadlocked which I can't debug as it goes to id 10 and stops, so it looks like one element is removed but it won't manage to move forward. With breakpoint anywhere it works fine, even at the end of any task


Solution

  • I think is a nice exercice use only semaphores to write thread safe code.

    You should protect three aspects:

    • the id is a simple global integer and only one process should write at once.
    • add or remove jobs to the queue should be protected also.
    • although we can use the queue protection to check availability, an elegant way to wait for new jobs is use a synchronized counter (a semaphore). In this way, we do not adquire the queue only for check.

    The modified code could be:

    // our `id` counter...
    private static int id = 0;
    // ...and their semaphore for thread safety
    private static Semaphore idSemaphore = new Semaphore(1);
    
    // jobs queue...
    private static Queue<Integer> lineJobs = new LinkedList<>();
    // ...and their semaphore for thread safety
    private static Semaphore lineSemaphore = new Semaphore(1);
    
    // thread safe counter for (thread safety) jobs availability check
    private static Semaphore lineSemaphoreAvailable = new Semaphore(0);
    
    public static void main(String[] args) {
    
    
        Runnable produce = () -> {
    
            // we will set `myId` only once
            final int myId;
    
            // create a new id
            try {
                // wait to be granted to touch the global (unsafe integer) `id`
                idSemaphore.acquire();
                myId = id++;
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            } finally {
                // with exception or not release the semaphore
                idSemaphore.release();
            }
    
            System.out.printf("%s: producing job #%d...%n", Thread.currentThread().getName(), myId);
    
            // producing job...
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            }
    
            // store the job
            try {
                // wait to be granted to touch the global (unsafe queue) `lineJobs`
                lineSemaphore.acquire();
                lineJobs.add(myId);
    
                // notify to consumers a new one job is available to peek
                lineSemaphoreAvailable.release();
    
                System.out.printf("%s: job #%d ready to do!%n", Thread.currentThread().getName(), myId);
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            } finally {
                lineSemaphore.release();
            }
        };
    
        Runnable consume = () -> {
    
            final int myId;
    
            // get a job
            try {
                // wait for a new job to become available
                lineSemaphoreAvailable.acquire();
                
                // wait to be granted to touch the global (unsafe queue) `lineJobs`
                lineSemaphore.acquire();
                myId = lineJobs.remove();
                System.out.printf("%s: job #%d adquired%n", Thread.currentThread().getName(), myId);
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            } finally {
                lineSemaphore.release();
            }
    
            System.out.printf("%s: job #%d finished!%n", Thread.currentThread().getName(), myId);
        };
    
        for (int i = 0; i < 100; i++) {
            new Thread(consume, "Consumer - " + i).start();
            new Thread(produce, "Producer - " + i).start();
        }
    }
    

    with output

    Producer - 0: producing job #0...
    Producer - 1: producing job #1...
    ...
    Producer - 46: producing job #45...
    Producer - 45: producing job #44...
    Producer - 9: job #9 ready to do!
    Producer - 44: producing job #46...
    Producer - 47: producing job #47...
    Producer - 48: producing job #48...
    Producer - 49: producing job #49...
    Consumer - 0: job #9 adquired
    Consumer - 0: job #9 finished!
    Producer - 14: job #14 ready to do!
    Consumer - 1: job #14 adquired
    Consumer - 1: job #14 finished!
    Producer - 50: producing job #50...
    Producer - 51: producing job #51...
    ...
    Producer - 72: producing job #72...
    Producer - 39: job #39 ready to do!
    Consumer - 2: job #39 adquired
    Consumer - 2: job #39 finished!
    Producer - 73: producing job #73...
    Producer - 74: producing job #74...
    Producer - 75: producing job #75...
    Producer - 10: job #10 ready to do!
    Producer - 40: job #40 ready to do!
    Consumer - 3: job #10 adquired
    Consumer - 3: job #10 finished!
    Producer - 76: producing job #76...
    Producer - 77: producing job #77...
    ...
    Producer - 82: producing job #82...
    Producer - 83: producing job #83...
    Producer - 84: producing job #84...
    Consumer - 4: job #40 adquired
    Consumer - 4: job #40 finished!
    Producer - 85: producing job #85...
    Producer - 86: producing job #86...
    Producer - 71: job #71 ready to do!
    Producer - 87: producing job #87...
    Consumer - 5: job #71 adquired
    Consumer - 5: job #71 finished!
    Producer - 88: producing job #88...
    Producer - 92: producing job #92...
    Producer - 91: producing job #91...
    ...
    Producer - 96: producing job #95...
    Producer - 98: producing job #98...
    Producer - 99: producing job #99...
    Producer - 84: job #84 ready to do!
    Consumer - 6: job #84 adquired
    Consumer - 6: job #84 finished!
    ...
    Consumer - 97: job #63 adquired
    Consumer - 97: job #63 finished!
    Producer - 90: job #90 ready to do!
    Consumer - 98: job #90 adquired
    Consumer - 98: job #90 finished!
    Producer - 87: job #87 ready to do!
    Consumer - 99: job #87 adquired
    Consumer - 99: job #87 finished!
    
    Process finished with exit code 0