I'm trying to create Producer-Consumer with semaphores only. With following code
public class Application {
public static int id = 0;
public static void main(String[] args) {
Semaphore producerSem = new Semaphore(1);
Semaphore consumerSem = new Semaphore(1);
Queue<Integer> line = new LinkedList<>();
Runnable produce = () -> {
try {
producerSem.acquire();
System.out.println(Thread.currentThread().getName() + " producing");
while (line.size() > 10) continue;
Thread.sleep(2000);
line.offer(id);
System.out.println(Thread.currentThread().getName() + " produced thing with id: " + id);
id++;
System.out.println(Thread.currentThread().getName() + " finished producing");
producerSem.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consume = () -> {
try {
consumerSem.acquire();
System.out.println(Thread.currentThread().getName() + " consuming");
while (line.size() < 1) continue;
int product = line.remove();
System.out.println(Thread.currentThread().getName() + " consumed thing with id: " + product);
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " finished consuming");
consumerSem.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 100; i++) {
new Thread(consume, "Consumer - " + i).start();
new Thread(produce, "Producer - " + i).start();
}
}
}
it gets deadlocked which I can't debug as it goes to id 10 and stops, so it looks like one element is removed but it won't manage to move forward. With breakpoint anywhere it works fine, even at the end of any task
I think is a nice exercice use only semaphores to write thread safe code.
You should protect three aspects:
id
is a simple global integer and only one process should write at once.The modified code could be:
// our `id` counter...
private static int id = 0;
// ...and their semaphore for thread safety
private static Semaphore idSemaphore = new Semaphore(1);
// jobs queue...
private static Queue<Integer> lineJobs = new LinkedList<>();
// ...and their semaphore for thread safety
private static Semaphore lineSemaphore = new Semaphore(1);
// thread safe counter for (thread safety) jobs availability check
private static Semaphore lineSemaphoreAvailable = new Semaphore(0);
public static void main(String[] args) {
Runnable produce = () -> {
// we will set `myId` only once
final int myId;
// create a new id
try {
// wait to be granted to touch the global (unsafe integer) `id`
idSemaphore.acquire();
myId = id++;
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
// with exception or not release the semaphore
idSemaphore.release();
}
System.out.printf("%s: producing job #%d...%n", Thread.currentThread().getName(), myId);
// producing job...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
}
// store the job
try {
// wait to be granted to touch the global (unsafe queue) `lineJobs`
lineSemaphore.acquire();
lineJobs.add(myId);
// notify to consumers a new one job is available to peek
lineSemaphoreAvailable.release();
System.out.printf("%s: job #%d ready to do!%n", Thread.currentThread().getName(), myId);
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
lineSemaphore.release();
}
};
Runnable consume = () -> {
final int myId;
// get a job
try {
// wait for a new job to become available
lineSemaphoreAvailable.acquire();
// wait to be granted to touch the global (unsafe queue) `lineJobs`
lineSemaphore.acquire();
myId = lineJobs.remove();
System.out.printf("%s: job #%d adquired%n", Thread.currentThread().getName(), myId);
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
lineSemaphore.release();
}
System.out.printf("%s: job #%d finished!%n", Thread.currentThread().getName(), myId);
};
for (int i = 0; i < 100; i++) {
new Thread(consume, "Consumer - " + i).start();
new Thread(produce, "Producer - " + i).start();
}
}
with output
Producer - 0: producing job #0...
Producer - 1: producing job #1...
...
Producer - 46: producing job #45...
Producer - 45: producing job #44...
Producer - 9: job #9 ready to do!
Producer - 44: producing job #46...
Producer - 47: producing job #47...
Producer - 48: producing job #48...
Producer - 49: producing job #49...
Consumer - 0: job #9 adquired
Consumer - 0: job #9 finished!
Producer - 14: job #14 ready to do!
Consumer - 1: job #14 adquired
Consumer - 1: job #14 finished!
Producer - 50: producing job #50...
Producer - 51: producing job #51...
...
Producer - 72: producing job #72...
Producer - 39: job #39 ready to do!
Consumer - 2: job #39 adquired
Consumer - 2: job #39 finished!
Producer - 73: producing job #73...
Producer - 74: producing job #74...
Producer - 75: producing job #75...
Producer - 10: job #10 ready to do!
Producer - 40: job #40 ready to do!
Consumer - 3: job #10 adquired
Consumer - 3: job #10 finished!
Producer - 76: producing job #76...
Producer - 77: producing job #77...
...
Producer - 82: producing job #82...
Producer - 83: producing job #83...
Producer - 84: producing job #84...
Consumer - 4: job #40 adquired
Consumer - 4: job #40 finished!
Producer - 85: producing job #85...
Producer - 86: producing job #86...
Producer - 71: job #71 ready to do!
Producer - 87: producing job #87...
Consumer - 5: job #71 adquired
Consumer - 5: job #71 finished!
Producer - 88: producing job #88...
Producer - 92: producing job #92...
Producer - 91: producing job #91...
...
Producer - 96: producing job #95...
Producer - 98: producing job #98...
Producer - 99: producing job #99...
Producer - 84: job #84 ready to do!
Consumer - 6: job #84 adquired
Consumer - 6: job #84 finished!
...
Consumer - 97: job #63 adquired
Consumer - 97: job #63 finished!
Producer - 90: job #90 ready to do!
Consumer - 98: job #90 adquired
Consumer - 98: job #90 finished!
Producer - 87: job #87 ready to do!
Consumer - 99: job #87 adquired
Consumer - 99: job #87 finished!
Process finished with exit code 0