Search code examples
javajava-streamexecutorserviceshutdown

Shutting down ExecutorService that is processing a Stream


I'm working on a program that processing a large stream of items and sends each result to a REST server. That server has a rate limit of 2000 requests per hour, so the program must pause a while between processing two items. Items failed to process should be presented to the user afterwards.

It all worked fine, until I discovered that shutdownNow() isn't working when the program is closed. The UI closes but the executor keeps working. Underneath a brief summary of the code.

ExecutorService exec = Executors.newSingleThreadExecutor();
SomeProcessor p = new SomeProcessor();

void process() {
    exec.submit(() -> {
        Stream<SomeObject> s = ...
        List<SomeObject> failed = p.process(s);
        // show failed in UI
    };
}

void exit() {
    exec.shutdownNow();
}

And the SomeProcessor class:

List<SomeObject> process(Stream<SomeObject> s) {
    List<SomeObject> failed = s
        .sequential()
        .filter(o -> !ignore(o)) // ignore irrelevant items
        .peek(o -> pause()) // make sure not to exceed server's rate limit
        .filter(o -> !process(o)) // keep items failed to process
        .collect(Collectors.asList());
    return failed;
}

void pause() {
    try {
        TimeUnit.MILLISECONDS.sleep(...);
    } catch (final InterruptedException e) {
        Thread.interrupted();
    }
}

boolean process(SomeObject o) {
    if (Thread.interrupted()) // make task interruptible
        // *** but then what? ***
    try {
        // process o and send result to server
        return true;
    } catch (SomeException e) {
        return false;
    }
}

I guess that shutdownNow() wasn't working because the task isn't interruptible. So I'm trying to make the task interruptible, but I don't know what it should look like. Any ideas?

And a bonus question too. The pause() method does what it should do. Still I'd rather use something like ScheduledThreadPoolExecutor.scheduleAtFixedRate(.), but then processing a stream of tasks. Does anything exist like that?

Thanks for any help!


Solution

  • Look at your pause method:

    void pause() {
        try {
            TimeUnit.MILLISECONDS.sleep(...);
        } catch (final InterruptedException e) {
            Thread.interrupted();
        }
    }
    

    You are already detecting interruption at this point but react on it by setting the interrupt state of the thread again, or at least trying to do so, as Thread.interrupted() should be Thread.currentThread().interrupt() to achieve this, which would be fine, if you do not want to support interruption, but here it is counter-productive. It will cause your next sleep call to throw an InterruptedException immediately, which you handle the same way, and so on. As a result, you’re not only proceeding with the processing of the remaining elements, you’re doing it without the sleeping between the elements.

    When you change the method to

    void pause() {
        try {
            TimeUnit.MILLISECONDS.sleep(...);
        } catch (final InterruptedException e) {
            throw new IllegalStateException("interrupted");
        }
    }
    

    interruption will terminate your stream operation with the IllegalStateException. For clarity, you may define your own exception type (extending RuntimeException) for this scenario, distinguishable from all other exception types.