I'm making an application that contains of two threads: one of them writes a value to LinkedBlockingQueue, another is reading. I'm using ScheduledExecutorService for running this operations in some period in seconds. The problem is that my application is freezing on the method take of BlockingQueue and i cant understand why.
This is a common resourse:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
This is reader
Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Writer:
Runnable writer = ()->{
res.q.add("hi");
};
Full code:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
Res res = new Res();
Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable writer = ()->{
res.q.add("hi");
};
Random rnd = new Random();
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(writer,time, TimeUnit.SECONDS);
}
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(reader,time, TimeUnit.SECONDS);
}
executorService.shutdown();
}
It should print twenty lines "hi [number]", but freezes on some line. For example, my current print:
hi 1
hi 2
hi 3
hi 4
hi 5
I found out If i increase count of threads newScheduledThreadPool(20)
it starts work, but how can I make it with two threads? Thanks!
It's a bit hard to follow your code, though it is obvious at the same time what is going on. You can run two threads at a time, at most, because of Executors.newScheduledThreadPool(2);
. Both of these threads are reader
threads.
So Thread-1
entered the try
block and acquired the semaphore permit via semaphore.acquire();
, but the queue was empty - as such it blocks on res.q.take()
. The next thread - Thread-2
is a reader thread too, but it can not acquire a permit
, since it is already taken by Thread-1
and is blocked on semaphore.acquire();
. Since you have no room for other Threads (you pool is blocked working with these two Threads), there are no writers that would put something in your queue and as such unblock Thread-1
(so that res.q.take()
would work).
Adding more worker Threads just delays the problem - you could end up in the same position as you were before.