Search code examples
javaactorshutdownexecutor

Java Executor partial shutdown


Lets have one classic Executor in application. Many parts of application use this executor for some computations, each computation can be cancelled, for this I can call shutdown() or shutdownNow() on Executor.

But I want to shutdown only part of tasks in Executor. Sadly I can't have access to Future objects, they are private part of computation implementation (actually computation is backed by actor framework jetlang)

I want something like Executor wrapper, which I could pass to computation and which should be backed by real Executor. Something like this:

// main application executor
Executor applicationExecutor = Executors.newCachedThreadPool();

// starting computation
Executor computationExecutor = new ExecutorWrapper(applicationExecutor);
Computation computation = new Computation(computationExecutor);
computation.start();

// cancelling computation
computation.cancel();
// shutting down only computation tasks
computationExecutor.shutdown();

// applicationExecutor remains running and happy

Or any other idea?


Solution

  • For those, who wants good ends: there is final solution, partially based of Ivan Sopov's answer. Luckily jetlang uses for running its tasks only Executor interface (not ExecutorService), so I make wrapper class which supports stopping tasks created only by this wrapper.

    static class StoppableExecutor implements Executor {
        final ExecutorService executor;
        final List<Future<?>> futures = Lists.newArrayList();
        boolean stopped;
    
        public StoppableExecutor(ExecutorService executor) {
            this.executor = executor;
        }
    
        void stop() {
            this.stopped = true;
            synchronized (futures) {
                for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) {
                    Future<?> future = iterator.next();
                    if (!future.isDone() && !future.isCancelled()) {
                        System.out.println(future.cancel(true));
                    }
                }
                futures.clear();
            }
        }
    
        @Override
        public void execute(Runnable command) {
            if (!stopped) {
                synchronized (futures) {
                    Future<?> newFuture = executor.submit(command);
                    for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) {
                        Future<?> future = iterator.next();
                        if (future.isDone() || future.isCancelled())
                            iterator.remove();
                    }
                    futures.add(newFuture);
                }
            }
        }
    }
    

    Using this is pretty straightforward:

    ExecutorService service = Executors.newFixedThreadPool(5);
    StoppableExecutor executor = new StoppableExecutor(service);
    
    // doing some actor stuff with executor instance
    PoolFiberFactory factory = new PoolFiberFactory(executor);
    
    // stopping tasks only created on executor instance
    // executor service is happily running other tasks
    executor.stop();
    

    That's all. Works nice.