I'm trying to process a relatively huge Stream
of List
in multiple threads, using an ExecutorService
. The method looks something like this.
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
Basically, what I'm trying to do here is, for each List
in the Stream
, a Runnable
is being created and submitted to the ExecutorService
. It seems to be working alright. But, what I really wanna do now, is to see if there's any way I can make the ExecutorService
run the first Runnable
obtained from the first List
in the Stream
while blocking other Runnables
until
its execution, and continue running other Runnables
(in parallel) after that. Could really use some help with this.
You can take first Runnable, execute it, and only then submit other Runnables.
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
where
class MyRunnable implements Runnable {
Record4<Integer, Integer, String, byte[]> record4List;
MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}