Search code examples
javamultithreadingtimeoutcallablecountdownlatch

How to wait until for few callable tasks complete and then execute


I want to execute some processors in two threads.. few of them are independent and could run anytime but few of them have dependency.. whenver order of execution reaches to that processor i want to check whether all previous callable tasks are executed or not? and future should be executed once this current execute.

following is the main thread method

PackageExportGraph executeMultiThread(PackageExportGraph exportGraphInp, PackageExportContext exportContextnInp)
            throws WTException {
        Map<PackageExportDependencyProcessor, Boolean> processorToParallelExecutionMap = new LinkedHashMap<>();
        this.processorQueue = new LinkedBlockingQueue<>();

        ExecutorService execService = null;
        try {
           
            int threads = 2;// 2

            countDownLatch = new CountDownLatch(threads);
            execService = ExecutorServiceFactory.getDefault().newExecutorService(threads, true);
            boolean isThread1Started = false;
            ThreadedDepProcessor thread1 = new ThreadedDepProcessor(
                    exportGraphInp, countDownLatch,
                    processorToParallelExecutionMap, processorQueue, exportContextnInp, isThread1Started);
            threadList.add(thread1);
            thread1.addListener(this);
            boolean isThread2Started = false;

            ThreadedDepProcessor thread2 = new ThreadedDepProcessor(
                    exportGraphInp, countDownLatch,
                    processorToParallelExecutionMap, processorQueue, exportContextnInp, isThread2Started);
            threadList.add(thread2);
            thread1.addListener(this);
            List<Future<LinkedBlockingQueue>> futureList = new ArrayList<>();

            for (ThreadedDepProcessor thread : threadList) {
                Future f = execService.submit(thread);
                System.out.println("f " + f);
                futureList.add(f);
            }

            int currentidx = 0;
            for (PackageExportDependencyProcessor processor : origOrderedList) {
                if (!processorToParallelExecutionMap.containsKey(processor)) {

                    System.out.println(" parallel threadStatusMap values 1 - " + threadStatusMap.values());
                    System.out.println("Adding parallel - " + processor);
                    if (currentidx > 0) {
                        while (threadStatusMap.containsValue(false)) {

                            System.out.println("Waiting");
                            System.out.println("threadStatusMap values - " + threadStatusMap.values());
                            Thread.sleep(1000);
                        }
                        Thread.sleep(2000);

                        // execService.awaitTermination(5, TimeUnit.SECONDS);
                        System.out.println("Size - " + futureList.size());
                        for (Future f : futureList) {

                            System.out.println("futureList is done " + f.isDone());
                            System.out.println("Getting future Object");
                            if (f.isDone()) {
                                continue;
                            }

                            Object o = f.get(10, TimeUnit.SECONDS);
                            System.out.println(o);
                           
                            /*
                             * Object object = f.get(10, TimeUnit.SECONDS); System.out.println("Obj " + object);
                             */
                        }
                        processorQueue.put(processor);
                        Thread.sleep(2000);
                    }
                    else {
                        processorQueue.put(processor);
                        Thread.sleep(2000);
                        System.out.println("Size - " + futureList.size());
                        for (Future f : futureList) {
                            System.out.println("futureList is done " + f.isDone());
                            System.out.println("Getting future Object");
                            if (f.isDone()) {
                                continue;
                            }
                            Object o = f.get(10, TimeUnit.SECONDS);
                            System.out.println(o);

                            /*
                             * Object object = f.get(10, TimeUnit.SECONDS); System.out.println("Obj " + object);
                             */
                        }
                        // execService.awaitTermination(5, TimeUnit.SECONDS);
                        while (threadStatusMap.containsValue(false)) {
                            System.out.println("Waiting");
                            System.out.println("threadStatusMap values - " + threadStatusMap.values());
                            Thread.sleep(1000);
                        }
                    }

                    for (ThreadedDepProcessor thread : threadList) {
                        System.out.println("Finished adding dependents" + thread.finishedAddingDependents.get());
                    }
                }
                else {
                    System.out.println("Adding non-parallel - " + processor);
                    processorQueue.put(processor);
                }
                currentidx++;
            }
        } catch (WTException | RuntimeException exc) {
            if (Objects.nonNull(execService)) {
                execService.shutdown();
            }
            throw exc;
        } catch (Exception exc) {
            throw new WTException(exc);
        } finally {
            System.out.println("shutting down");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            execService.shutdown();
        }
        return exportGraphInp;

    }

and this is callable

 @Override
        public LinkedBlockingQueue call() throws WTException, InterruptedException {
            try {
                System.out.println("Started - ");
                isThreadStarted = true;
                while (!processorQueue.isEmpty()) {
                    nextEligible = processorQueue.take();
                    if (Objects.isNull(nextEligible)) {
                        finishedAddingDependents.set(true);
                        break;
                    }
                    System.out.println("calling addDependentObjects for processor - " + nextEligible);
                    nextEligible.addDependentObjects(exportGraph, exportContext);
                    nextEligible = null;
                    // notifyListeners();
                }

            } catch (Exception e) {
                System.out.println("Error occured " + e);
                e.printStackTrace();
                return processorQueue;
            } finally {
                countDownLatch.countDown();
                System.out.println("countDownLatch now - " + countDownLatch.getCount());

            }
            return processorQueue;
        }
    }

I was trying to check while(future.isDone()) but its going indefinite loop. i want to check whether thread/callable execution is started or not.

If started then while executing serial processor i want to wait till all existing is executed and then start execution and wait till its execution do not pick next one


Solution

  • What i did is manitained one synchronized collection, which will let us know status of execution of each processor, and based on that we can wait or go ahead