I'm working on a program that processing a large stream of items and sends each result to a REST server. That server has a rate limit of 2000 requests per hour, so the program must pause a while between processing two items. Items failed to process should be presented to the user afterwards.
It all worked fine, until I discovered that shutdownNow()
isn't working when the program is closed. The UI closes but the executor keeps working. Underneath a brief summary of the code.
ExecutorService exec = Executors.newSingleThreadExecutor();
SomeProcessor p = new SomeProcessor();
void process() {
exec.submit(() -> {
Stream<SomeObject> s = ...
List<SomeObject> failed = p.process(s);
// show failed in UI
};
}
void exit() {
exec.shutdownNow();
}
And the SomeProcessor
class:
List<SomeObject> process(Stream<SomeObject> s) {
List<SomeObject> failed = s
.sequential()
.filter(o -> !ignore(o)) // ignore irrelevant items
.peek(o -> pause()) // make sure not to exceed server's rate limit
.filter(o -> !process(o)) // keep items failed to process
.collect(Collectors.asList());
return failed;
}
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
boolean process(SomeObject o) {
if (Thread.interrupted()) // make task interruptible
// *** but then what? ***
try {
// process o and send result to server
return true;
} catch (SomeException e) {
return false;
}
}
I guess that shutdownNow()
wasn't working because the task isn't interruptible. So I'm trying to make the task interruptible, but I don't know what it should look like. Any ideas?
And a bonus question too. The pause()
method does what it should do. Still I'd rather use something like ScheduledThreadPoolExecutor.scheduleAtFixedRate(.)
, but then processing a stream of tasks. Does anything exist like that?
Thanks for any help!
Look at your pause
method:
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
You are already detecting interruption at this point but react on it by setting the interrupt state of the thread again, or at least trying to do so, as Thread.interrupted()
should be Thread.currentThread().interrupt()
to achieve this, which would be fine, if you do not want to support interruption, but here it is counter-productive. It will cause your next sleep
call to throw an InterruptedException
immediately, which you handle the same way, and so on. As a result, you’re not only proceeding with the processing of the remaining elements, you’re doing it without the sleeping between the elements.
When you change the method to
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
throw new IllegalStateException("interrupted");
}
}
interruption will terminate your stream operation with the IllegalStateException
. For clarity, you may define your own exception type (extending RuntimeException
) for this scenario, distinguishable from all other exception types.