Search code examples
javaconcurrency

java StructuredTaskScope.joinUntil is not working for blocking Callable


In Java,
Can i input a callable (that runs forever) to a StructuredTaskScope,
and use joinUntil function to set a timeout,
and expect it to be stop automatically?

Please see the implementation here

Here is the callable that runs forever

Callable<String> callableBlocking = () -> {
d.ci("fetchFail.callableBlocking", "begin");
   while (true) {
      d.ci("fetchFail.callableBlocking", "while");
      TS_ThreadWait.of(Duration.ofSeconds(1));
   }
// d.ci("fetchFail.callableBlocking", "never ends");
// return "4";
};

----------- UPDATE ---------------------
Please find the simplified code to recreate the situation

Consumer<String> log = msg -> System.out.println("log -> " + msg);
Consumer<Duration> wait = duration -> {
    try {
        Thread.sleep(duration);
    } catch (Exception e) {
    }
};
Callable<String> callableBlocking = () -> {
    log.accept("fetchFail.callableBlocking.begin");
    while (true) {
        log.accept("fetchFail.callableBlocking.while");
        wait.accept(Duration.ofSeconds(1));
    }
//  log.accept("fetchFail.callableBlocking.neverEnds");
//  return "never returns";
};
var scope = new StructuredTaskScope.ShutdownOnFailure();
try {
    var future = scope.fork(callableBlocking);
    scope.joinUntil(Instant.now().plusSeconds(1));
    scope.throwIfFailed();
    log.accept("result: " + future.resultNow());
} catch (InterruptedException | TimeoutException | ExecutionException e) {
    log.accept(e.getClass().getName() + ":" + e.getMessage());
    if (e instanceof TimeoutException) {
        scope.shutdown();
        log.accept("INFO: shutdown triggered");
    }
} finally {
    scope.close();
}

CONSOLE:

tugal@TULAP /cygdrive/c/me/codes/com.tugalsan/tst/com.tugalsan.tst.thread 
$ java --enable-preview --add-modules jdk.incubator.concurrent -jar target/com.tugalsan.tst.thread-1.0-SNAPSHOT-jar-with-dependencies.jar 
WARNING: Using incubator modules: jdk.incubator.concurrent 
log -> fetchFail.callableBlocking.begin 
log -> fetchFail.callableBlocking.while 
log -> fetchFail.callableBlocking.while 
log -> java.util.concurrent.TimeoutException:null 
log -> INFO: shutdown triggered
log -> fetchFail.callableBlocking.while 
log -> fetchFail.callableBlocking.while 
log -> fetchFail.callableBlocking.while 
log -> fetchFail.callableBlocking.while 
...

Solution

  • The joinUntil interrupts unfinished tasks but your code is not allowing interrupt properly as you are trapping InterruptedException here:

    try {
        Thread.sleep(1000L);
    } catch (Exception e) {
    }
    

    Just change above code to re-throw RuntimeException or replace wait.accept(Duration.ofSeconds(1)); by Thread.sleep(1000L); so that you don't ignore InterruptedException:

    Callable<String> callableBlocking = () -> {
        while (true) {
            log.accept("fetchFail.callableBlocking.while");
            Thread.sleep(1000L);    
        }
    };
    

    You can also simplify the code without shutdown/close as try-with-resources and .ShutdownOnFailure() deal with this for you:

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        ...
    }