In Java,
Can i input a callable (that runs forever) to a StructuredTaskScope,
and use joinUntil function to set a timeout,
and expect it to be stop automatically?
Please see the implementation here
Here is the callable that runs forever
Callable<String> callableBlocking = () -> {
d.ci("fetchFail.callableBlocking", "begin");
while (true) {
d.ci("fetchFail.callableBlocking", "while");
TS_ThreadWait.of(Duration.ofSeconds(1));
}
// d.ci("fetchFail.callableBlocking", "never ends");
// return "4";
};
----------- UPDATE ---------------------
Please find the simplified code to recreate the situation
Consumer<String> log = msg -> System.out.println("log -> " + msg);
Consumer<Duration> wait = duration -> {
try {
Thread.sleep(duration);
} catch (Exception e) {
}
};
Callable<String> callableBlocking = () -> {
log.accept("fetchFail.callableBlocking.begin");
while (true) {
log.accept("fetchFail.callableBlocking.while");
wait.accept(Duration.ofSeconds(1));
}
// log.accept("fetchFail.callableBlocking.neverEnds");
// return "never returns";
};
var scope = new StructuredTaskScope.ShutdownOnFailure();
try {
var future = scope.fork(callableBlocking);
scope.joinUntil(Instant.now().plusSeconds(1));
scope.throwIfFailed();
log.accept("result: " + future.resultNow());
} catch (InterruptedException | TimeoutException | ExecutionException e) {
log.accept(e.getClass().getName() + ":" + e.getMessage());
if (e instanceof TimeoutException) {
scope.shutdown();
log.accept("INFO: shutdown triggered");
}
} finally {
scope.close();
}
CONSOLE:
tugal@TULAP /cygdrive/c/me/codes/com.tugalsan/tst/com.tugalsan.tst.thread
$ java --enable-preview --add-modules jdk.incubator.concurrent -jar target/com.tugalsan.tst.thread-1.0-SNAPSHOT-jar-with-dependencies.jar
WARNING: Using incubator modules: jdk.incubator.concurrent
log -> fetchFail.callableBlocking.begin
log -> fetchFail.callableBlocking.while
log -> fetchFail.callableBlocking.while
log -> java.util.concurrent.TimeoutException:null
log -> INFO: shutdown triggered
log -> fetchFail.callableBlocking.while
log -> fetchFail.callableBlocking.while
log -> fetchFail.callableBlocking.while
log -> fetchFail.callableBlocking.while
...
The joinUntil
interrupts unfinished tasks but your code is not allowing interrupt properly as you are trapping InterruptedException
here:
try {
Thread.sleep(1000L);
} catch (Exception e) {
}
Just change above code to re-throw RuntimeException
or replace wait.accept(Duration.ofSeconds(1));
by Thread.sleep(1000L);
so that you don't ignore InterruptedException
:
Callable<String> callableBlocking = () -> {
while (true) {
log.accept("fetchFail.callableBlocking.while");
Thread.sleep(1000L);
}
};
You can also simplify the code without shutdown/close as try-with-resources and .ShutdownOnFailure()
deal with this for you:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
...
}