Search code examples
javaexecutorservicejava-threads

Is it possible to block other runnables while executing the first one, using ExecutorService in Java?


I'm trying to process a relatively huge Stream of List in multiple threads, using an ExecutorService. The method looks something like this.

public void initMigration() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {            
        streamOfLists.forEach(record4List -> {
            Runnable runnable = () -> {
                try {
                    final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
                    LOGGER.info("Invoking POST with payload {}", attachments);
                    Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
                    restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
                } catch (ExceptionA | ExceptionB e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        });
    }
    LOGGER.info("Shutting down the ExecutorService");
    executorService.shutdown();
}

Basically, what I'm trying to do here is, for each List in the Stream, a Runnable is being created and submitted to the ExecutorService. It seems to be working alright. But, what I really wanna do now, is to see if there's any way I can make the ExecutorService run the first Runnable obtained from the first List in the Stream while blocking other Runnables until its execution, and continue running other Runnables (in parallel) after that. Could really use some help with this.


Solution

  • You can take first Runnable, execute it, and only then submit other Runnables.

        try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
            Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
            if (it.hasNext()) {
                List<Record4<Integer, Integer, String, byte[]>> list = it.next();
                Runnable runnable = new MyRunnable(record4List);
                runnable.run();
            }
            while (it.hasNext()) {
                List<Record4<Integer, Integer, String, byte[]>> list = it.next();
                Runnable runnable = new MyRunnable(record4List);
                executorService.submit(runnable);
            }
        }
    

    where

    class MyRunnable implements Runnable {
        Record4<Integer, Integer, String, byte[]> record4List;
    
        MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
            this.record4List = record4List;
        }
    
        @Override
        public void run() {
            try {
                final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
                LOGGER.info("Invoking POST with payload {}", attachments);
                Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
                restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
            } catch (ExceptionA | ExceptionB e) {
                e.printStackTrace();
            }
        }
    }