Search code examples
javamultithreadingproducer-consumer

Reading from file in producer/consumer model


I'm trying to read a string from a file, do an HTTP request with that string, and if the request returns a 200 then do another HTTP request with it.

I thought a good model for this would be the producer consumer model, but for some reason I'm totally stuck. The whole process just stops at a certain point for some reason and I have no idea why.

public static void main(String[] args) throws InterruptedException, IOException {

    ArrayBlockingQueue<String> subQueue = new ArrayBlockingQueue<>(3000000);

    ThreadPoolExecutor consumers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
    ThreadPoolExecutor producers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10000000));
    consumers.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    String fileName = "test";
    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
        String line;
        while ((line = br.readLine()) != null) {
            String address = new JSONObject(line).getString("Address");
            producers.submit(new Thread(() -> {
                if (requestReturn200(address)) {
                    try {
                        subQueue.put(address);
                    } catch (InterruptedException e) {
                        System.out.println("Error producing.");
                    }
                }
            }));
        }
        producers.shutdown();
    }

    while (subQueue.size() != 0 || !producers.isShutdown()) {
        String address = subQueue.poll(1, TimeUnit.SECONDS);
        if (address != null) {
            consumers.submit(new Thread(() -> {
                try {
                    System.out.println("Doing..." + address);
                    doOtherHTTPReqeust(address);
                } catch (Exception e) {
                    System.out.println("Fatal error consuming);
                }
            }));

        } else {
            System.out.println("Null");
        }
    }

    consumers.shutdown();
}

Any and all help would be greatly appreciated.


Solution

  •  while (subQueue.size() != 0 || !producers.isShutdown()) {
    

    First of all !producers.isShutdown() will always return !true because it is checked after producers.shutdown(). isShutdown does not says if tasks in pool are still running or not, but if pool has been shut down, resulting in inability to accept new tasks. In your case this will always be false

    Second, subQueue.size() != 0 While your consumer creating loop and consumers takes much more faster data from queue than producers can provide, in middle of "producing" process, consumers might have clear the quueue resulting in condition subQueue.size!= to be falsy. As you know this would break the loop and forbit submition of producers.

    You should stop using queue.size() but rather use blocking properties of BlockingQueue. queue.take() will block until new element is available.

    So the overall flow should be like that.

    1. Start some pool of producer tasks, like you are doing right now.
    2. Let producer put data in blocking queue - yep you are here
    3. Start some (I would say fixed) number of consumers
    4. Let consumers queue.take() data from queue. This will force consumers to "autowait" for new data and take it when it will become available.

    I will put aside mentions that creating 200 threads is insane and misses the whole purpose of multithreading consumers/producers/task pools, at least in your case IMHO. The idea is to use small amount of threads as they are heavyweight to do plenty of queued tasks. But that is discussion for different time .