I'm trying to read a string from a file, do an HTTP request with that string, and if the request returns a 200 then do another HTTP request with it.
I thought a good model for this would be the producer consumer model, but for some reason I'm totally stuck. The whole process just stops at a certain point for some reason and I have no idea why.
public static void main(String[] args) throws InterruptedException, IOException {
ArrayBlockingQueue<String> subQueue = new ArrayBlockingQueue<>(3000000);
ThreadPoolExecutor consumers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
ThreadPoolExecutor producers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10000000));
consumers.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
String fileName = "test";
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
String line;
while ((line = br.readLine()) != null) {
String address = new JSONObject(line).getString("Address");
producers.submit(new Thread(() -> {
if (requestReturn200(address)) {
try {
subQueue.put(address);
} catch (InterruptedException e) {
System.out.println("Error producing.");
}
}
}));
}
producers.shutdown();
}
while (subQueue.size() != 0 || !producers.isShutdown()) {
String address = subQueue.poll(1, TimeUnit.SECONDS);
if (address != null) {
consumers.submit(new Thread(() -> {
try {
System.out.println("Doing..." + address);
doOtherHTTPReqeust(address);
} catch (Exception e) {
System.out.println("Fatal error consuming);
}
}));
} else {
System.out.println("Null");
}
}
consumers.shutdown();
}
Any and all help would be greatly appreciated.
while (subQueue.size() != 0 || !producers.isShutdown()) {
First of all !producers.isShutdown()
will always return !true
because it is checked after producers.shutdown()
. isShutdown
does not says if tasks in pool are still running or not, but if pool has been shut down, resulting in inability to accept new tasks. In your case this will always be false
Second, subQueue.size() != 0
While your consumer creating loop and consumers takes much more faster data from queue than producers can provide, in middle of "producing" process, consumers might have clear the quueue resulting in condition subQueue.size!=
to be falsy. As you know this would break the loop and forbit submition of producers.
You should stop using queue.size()
but rather use blocking properties of BlockingQueue
. queue.take()
will block until new element is available.
So the overall flow should be like that.
queue.take()
data from queue. This will force consumers to "autowait" for new data and take it when it will become available. I will put aside mentions that creating 200 threads is insane and misses the whole purpose of multithreading consumers/producers/task pools, at least in your case IMHO. The idea is to use small amount of threads as they are heavyweight to do plenty of queued tasks. But that is discussion for different time .