I am trying to integrate Multithreading with FileWatcher service in java. i.e., I am constantly listening to a particular directory -> whenever a new file is created, I need to spawn a new thread which processes the file (say it prints the file contents). I kind of managed to write a code which compiles and works (but not as expected). It works sequentially meaning file2 is processed after file1 and file 3 is processed after file 2. I want this to be executed in parallel.
Adding the code snippet:
while(true) {
WatchKey key;
try {
key = watcher.take();
Path dir = keys.get(key);
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
if(kind == StandardWatchEventKinds.ENTRY_CREATE){
boolean valid = key.reset();
if (!valid) {
break;
}
log.info("New entry is created in the listening directory, Calling the FileProcessor");
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path newFileCreatedResolved = dir.resolve(ev.context());
try{
FileProcessor processFile = new FileProcessor(newFileCreatedResolved.getFileName().toString());
Future<String> result = executor.submit(processFile);
try {
System.out.println("Processed File" + result.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
//executor.shutdown(); add logic to shut down
}
}
}
}
}
and the FileProcessor class
public class FileProcessor implements Callable <String>{
FileProcessor(String triggerFile) throws FileNotFoundException, IOException{
this.triggerFile = triggerFile;
}
public String call() throws Exception{
//logic to write to another file, this new file is specific to the input file
//returns success
}
What is happening now -> If i transfer 3 files at a time, they are sequentially. First file1 is written to its destination file, then file2, file3 so on.
Am I making sense? Which part I need to change to make it parallel? Or Executor service is designed to work like that.
The call to Future.get()
is blocking. The result isn't available until processing is complete, of course, and your code doesn't submit another task until then.
Wrap your Executor
in a CompletionService
and submit()
tasks to it instead. Have another thread consume the results of the CompletionService
to do any processing that is necessary after the task is complete.
Alternatively, you can use the helper methods of CompletableFuture
to set up an equivalent pipeline of actions.
A third, simpler, but perhaps less flexible option is simply to incorporate the post-processing into the task itself. I demonstrated a simple task wrapper to show how this might be done.