Search code examples
javamultithreadingthreadpoolexecutorservicejava-threads

How to submit List<LinkedBlockingQueue<Long>> to ThreadPoolExecutor and each thread will pick one LinkedBlockingQueue and execute it parallel


I submitting List of LinkedBlockingQueue of the Long type to ThreadPoolExecutor and condition should be as each thread pick LinkedBlockingQueue of long and execute in parallel

This is my Method Logic

public void doParallelProcess() {

    List<LinkedBlockingQueue<Long>> linkedBlockingQueueList = splitListtoBlockingQueues();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
    Long initial = System.currentTimeMillis();
    try {

        System.out.println("linkedBlockingQueueList begin size is " + linkedBlockingQueueList.size() + "is empty"
                + linkedBlockingQueueList.isEmpty());

        while (true) {
            linkedBlockingQueueList.parallelStream().parallel().filter(q -> !q.isEmpty()).forEach(queue -> {
                Long id = queue.poll();
                MyTestRunnable runnab = new MyTestRunnable(id);
                executor.execute(runnab);
                System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
                        + executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
            });

            System.out.println("linkedBlockingQueueList end size is " + linkedBlockingQueueList.size() + "is empty"
                    + linkedBlockingQueueList.isEmpty());

            System.out.println("executor service " + executor);

            if (executor.getCompletedTaskCount() == (long) mainList.size()) {
                break;
            }

            while (executor.getActiveCount() != 0) {
                System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
                        + executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
                Thread.sleep(1000L);
            }

        }
    } catch (Exception e) {
    } finally {
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
    }
} `

How to submit a list of LinkedBlockingQueue to an individual thread example :

  1. List<LinkedBlockingQueue<Long>> each LinkedBlockingQueue contains 50 queue data
  2. size of List<LinkedBlockingQueue<Long>> is 50
  3. each thread should pick one LinkedBlockingQueue<Long> and execute 50 queue tasks.

Solution

  • The input to an ExecutorService is either Runnable or Callable. Any task you submit needs to implement one of those two interfaces. If you want to submit a bunch of tasks to a thread pool and wait until they are all complete, then you can use the invokeAll method and loop over the resulting Futures, calling get on each: see this informative answer to a similar question.

    You do not need to batch your input tasks into groups, though. You never want an executor service to have idle threads while there is still work left to do! You want it to be able to grab the next task as soon as resources free up, and batching in this fashion runs contrary to that. Your code is doing this:

    while non-empty input lists exist {
        for each non-empty input list L {
            t = new Runnable(L.pop())
            executor.submit(t)
        }
        while (executor.hasTasks()) {
            wait
        }
    }
    

    Once one of those tasks completes, that thread should be free to move on to other work. But it won't because you wait until all N tasks complete before you submit any more. Submit them all at once with invokeAll and let the executor service do what it was built to do.