Backgroud
Building a data pipeline where each message received is to be processes asynchronously. Trying to simulate the behavior by
Code
BufferedReader reader = null;
ExecutorService service = Executors.newFixedThreadPool(4);
try {
String filepath = str[0];
FileReaderAsync fileReaderAsync = new FileReaderAsync();
reader = new BufferedReader(new FileReader(filepath));
Random r = new Random();
String line;
while ((line = reader.readLine()) != null) {
Integer val = Integer.valueOf(line.trim());
int randomInt = r.nextInt(5);
Thread.sleep(randomInt * 100);
CompletableFuture.supplyAsync(() -> {
System.out.println("Square : " + val);
return val * val;
}, service)
.thenApplyAsync(value -> {
System.out.println(":::::::Double : " + value);
return 2 * value;
}, service)
.thenAccept(value -> {
System.out.println("Answer : " + value);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
reader.close();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
For simplicity just pasting main method code, assume variables are declared and in scope.
Issues
Code
Design
Edit 1 Added
public void shutdown() {
service.shutdown();
}
and
reader.close();
fileReaderAsync.shutdown();
which did the trick.
You're using a thread pool created by:
ExecutorService service = Executors.newFixedThreadPool(4);
Which by default is configured to use non-daemon threads. And as documented by java.lang.Thread:
When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named
main
of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs:
- The
exit
method of classRuntime
has been called and the security manager has permitted the exit operation to take place.- All threads that are not daemon threads have died, either by returning from the call to the
run
method or by throwing an exception that propagates beyond therun
method.
In other words, any non-daemon thread that is still alive will also keep the JVM alive.
There are at least two solutions to your problem.
You can shutdown the thread pool when you're finished with it.
service.shutdown(); // Calls ExecutorService#shutdown()
The #shutdown()
method starts a graceful shutdown. It prevents any new tasks from being submitted but allows any already-submitted tasks to complete. Once all tasks are complete the pool will terminate (i.e. all threads will be allowed to die). If you want to wait for all tasks to complete before continuing then you can call #awaitTermination(long,TimeUnit)
after calling #shutdown()
/ #shutdownNow()
.
If you want to try and immediately shutdown the pool then call #shutdownNow()
. Any currently-executing tasks will be cancelled and any submitted-but-not-yet-started tasks are simply not executed (and are in fact returned to you in a list). Note whether a task responds to cancellation depends on how that task was implemented.
A daemon thread will not keep the JVM alive. You can configure the thread pool to use daemon threads via a ThreadFactory
.
ExecutorService service = Executors.newFixedThreadPool(4, r -> {
Thread t = new Thread(r); // may want to name the threads
t.setDaemon(true);
return t;
});
Note you should still shutdown the thread pool when finished with it, regardless.