Search code examples
javaakkaactor

How to check if after all actors are finished


I'm new to Akka toolkit. I need to run a process on multiple files that takes a considerable amount of time. So I created one actor per file and started the processing. I'm creating these actors in a POJO class as follows:

public class ProcessFiles {
    private static final Logger logger = LoggerFactory.getLogger(ProcessFiles.class.getSimpleName());

    public static void main(String[] args) throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();

        logger.info("Creating actor system");
        ActorSystem system = ActorSystem.create("actor_system");

        Set<String> files = new HashSet<>();
        Stream<String> stringStream = Files.lines(Paths.get(fileName));
        stringStream.forEach(line -> files.addAll(Arrays.asList(line.split(","))));
        List<CompletableFuture<Object>> futureList = new ArrayList<>();

        files.forEach((String file) -> {
            ActorRef actorRef = system.actorOf(Props.create(ProcessFile.class, file));
            futureList.add(PatternsCS.ask(actorRef, file, DEFAULT_TIMEOUT).toCompletableFuture());
        });

        boolean isDone;
        do {
            Thread.sleep(30000);
            isDone = true;
            int count = 0;
            for (CompletableFuture<Object> future : futureList) {
                isDone = isDone & (future.isDone() || future.isCompletedExceptionally() || future.isCancelled());
                if (future.isDone() || future.isCompletedExceptionally() || future.isCancelled()) {
                    ++count;
                }
            }
            logger.info("Process is completed for " + count + " files out of " + files.size() + " files.");
        } while (!isDone);
        logger.info("Process is done in " + (System.currentTimeMillis() - startTime) + " ms");
        system.terminate();
    }
} 

Here, ProcessFile is the actor class. After invoking all the actors in order to exit the program, the main process checks whether all the actors are finished or not in every 30 seconds. Is there any better way to implement this kind of functionality?


Solution

  • I would suggest to create one more actor that keeps tracks of termination of all the actors in system, and closing the actor system when all the actors are killed. So in your application-

    ProcessFile actor can send a poison pill to self, after processing the file. WatcherActor will watch(context.watch(processFileActor)) the ProcessFileActor and maintain the count of all the ProcessFile actors registered.

    On termination of the actors WatcherActor will receive the Terminated message. It will decrease the count, and when the count reaches 0, close the ActorSystem.