Search code examples
javafileasynchronouscompletable-future

CompletableFuture along with reading using FileReader, the program doesn't exit


Backgroud

Building a data pipeline where each message received is to be processes asynchronously. Trying to simulate the behavior by

  • Reading message from file
  • Processing with CompletableFuture

Code

 BufferedReader reader = null;
 ExecutorService service = Executors.newFixedThreadPool(4);
 try {
   String filepath = str[0];
   FileReaderAsync fileReaderAsync = new FileReaderAsync();
   reader = new BufferedReader(new FileReader(filepath));
   Random r = new Random();
   String line; 
   while ((line = reader.readLine()) != null) {
          Integer val = Integer.valueOf(line.trim());
          int randomInt = r.nextInt(5);
          Thread.sleep(randomInt * 100);
          CompletableFuture.supplyAsync(() -> {
              System.out.println("Square : " + val);
              return val * val;
          }, service) 
             .thenApplyAsync(value -> {
                  System.out.println(":::::::Double : " + value);
                  return 2 * value;
              }, service)
              .thenAccept(value -> {
                   System.out.println("Answer : " + value);
              });
   }
 } catch (Exception e) {
     e.printStackTrace();
 } finally {
     try {
          reader.close();
     } catch (Exception e) {
          throw new RuntimeException(e.getMessage());
     }
 }

For simplicity just pasting main method code, assume variables are declared and in scope.

Issues

Code

  • Program works fine but does not exit, tried commenting Async logic and just reading the file. it works fine and ends too.

Design

  • In Streaming pipeline, will this Async model work for each incoming message if each message is passed to the CompletableFuture for processing?
  • Or it will block for current message to be processed ?
  • It is required to introduce another queue and then consume from it instead of consuming incoming messages as they flow in ?

Edit 1 Added

public void shutdown() {
   service.shutdown();
}

and

  reader.close();
  fileReaderAsync.shutdown();

which did the trick.


Solution

  • Problem

    You're using a thread pool created by:

    ExecutorService service = Executors.newFixedThreadPool(4);
    

    Which by default is configured to use non-daemon threads. And as documented by java.lang.Thread:

    When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named main of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs:

    • The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
    • All threads that are not daemon threads have died, either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

    In other words, any non-daemon thread that is still alive will also keep the JVM alive.


    Solution

    There are at least two solutions to your problem.

    Shutdown the Thread Pool

    You can shutdown the thread pool when you're finished with it.

    service.shutdown(); // Calls ExecutorService#shutdown()
    

    The #shutdown() method starts a graceful shutdown. It prevents any new tasks from being submitted but allows any already-submitted tasks to complete. Once all tasks are complete the pool will terminate (i.e. all threads will be allowed to die). If you want to wait for all tasks to complete before continuing then you can call #awaitTermination(long,TimeUnit) after calling #shutdown() / #shutdownNow().

    If you want to try and immediately shutdown the pool then call #shutdownNow(). Any currently-executing tasks will be cancelled and any submitted-but-not-yet-started tasks are simply not executed (and are in fact returned to you in a list). Note whether a task responds to cancellation depends on how that task was implemented.

    Use Daemon Threads

    A daemon thread will not keep the JVM alive. You can configure the thread pool to use daemon threads via a ThreadFactory.

    ExecutorService service = Executors.newFixedThreadPool(4, r -> {
      Thread t = new Thread(r); // may want to name the threads
      t.setDaemon(true);
      return t;
    });
    

    Note you should still shutdown the thread pool when finished with it, regardless.