Search code examples
javamultithreadingexecutorservicefuturetask

java Multithreading - throttle submission to ExecutorService


I have a data file with thousand of rows. I am reading them and saving them in the database. I want to multi-thread this process in the batches of say 50 rows. As I am read in the file, 10 rows are submitted to an ExecutorService.

ExecutorService executor = Executors.newFixedThreadPool(5);`

I can do below in a in a while loop till my rows end....

 Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);

But, I don't want to read the entire file into memory if the processing 10 rows is taking time. I only want to submit 5 till one of the threads return, then I submit the next.

Let's say a thread takes 20 seconds to save the 10 records, I don't want the ExecutorService to be fed thousand of lines since the reading process is continuing to read and submit to ExecutorService

What is the best way to achieve this?


Solution

  • You can do this with a LinkedList<Future<?>> that stores futures until you've reached some pre-determined size. Here's some skeleton code that should get you most of the way there:

    int threads = 5;
    ExecutorService service = Executors.newFixedThreadPool(threads);
    LinkedList<Future<?>> futures = new LinkedList<>();
    
    //As long as there are rows to save:
    while(moreRowsLeft()){
        //dump another callable onto the queue:
        futures.addLast(service.submit(new RowSavingCallable());
    
        //if the queue is "full", wait for the next one to finish before
        //reading any more of the file:
        while(futures.size() >= 2*threads) futures.removeFirst().get();
    }
    
    //All rows have been submitted but some may still be writing to the DB:
    for(Future<?> f : futures) future.get();
    
    //All rows have been saved at this point
    

    You may wonder why I've allowed the number of futures to reach twice the number of threads on the machine - this allows executor service threads to be working on database saves while the main thread is creating more work to be done. This can help hide any I/O cost associated with making more callables available for processing while the worker threads are busy doing the Database write.