I am using Java 21 and Spring Boot 3.2.0 (SNAPSHOT). I have more classes, but I show only the essential classes for this case. I am trying to check some records in a multithreading environment using the ExecutorService and, based on the result of the checking, to continue or stop the ExecutorService.
The SecurityCheckc class:
private ExecutorService pool;
private final SendToDatabse sendToDatabse;
private final UserCheck userCheck;
private final SynchronisedCheckedMap synchronisedCheckedMap;
public SecurityCheck(SendToDatabse sendToDatabse,
UserCheck userCheck,
SynchronisedCheckedMap synchronisedCheckedMap) {
this.sendToDatabse = sendToDatabse;
this.userCheck = userCheck;
this.synchronisedCheckedMap = synchronisedCheckedMap;
}
public void securityCheck(List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
List<Future<?>> futures = new ArrayList<>();
this.pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
list.forEach(line -> futures.add(pool.submit(
new Security(line,
this.userCheck,
this.synchronisedCheckedMap
))));
for (Future<?> future : futures) {
future.get(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}catch (Exception e){
throw new RuntimeException(e);
}
finally {
print();
pool.shutdown();
while (!pool.awaitTermination(24L,TimeUnit.SECONDS)){
System.out.println("Not yet. Still in waiting for termination");
}
// this.sendToDatabse.sendToDatabse(synchronisedCheckedMap.synchronisedCheckedMap);
}
}
public void stopPool(Short code, Customer customer){
System.out.println("oprire");
Map<Short, List<Customer>> map = new HashMap<>();
map.put(code, (List<Customer>) customer);
map.forEach((k,v)->{
System.out.println(k + "\t" + v);
});
this.pool.shutdownNow();
// this.sendToDatabse.sendToDatabse(map);
Thread.currentThread().interrupt();
}
The Security class:
public class Security implements SQLInjections, Runnable{
private final String line;
private final UserCheck userCheck;
private final SynchronisedCheckedMap synchronisedCheckedMap;
public Security(String line,
UserCheck userCheck,
SynchronisedCheckedMap synchronisedCheckedMap){
this.line = line;
this.userCheck = userCheck;
this.synchronisedCheckedMap = synchronisedCheckedMap;
}
@Override
public void run() {
try {
if (Thread.interrupted()) {
return;
}
JsonToLine jsonToLine = new JsonToLine();
String newLine = jsonToLine.jsonToLine(line);
StringToValues stringToValues = new StringToValues();
List<String> values = stringToValues.stringToValues(newLine);
// values.forEach(string -> {System.out.println("line: " + string);});
Short validID = validID(values.get(0));
Short validUser = null;
Short validPassword = null;
Short validMessage = null;
List<String> message = new ArrayList<>();
Customer customer;
switch (validID) {
case 200:
// values.forEach(string -> {System.out.println("code" + "\t" + "line: " + string);});
validUser = validUser(values.get(1));
validPassword = validPassword(values.get(2));
message = Arrays.asList(values.get(3).split(";"));
validMessage = validMesage(message);
customer =
new Customer(Long.parseLong(values.get(0)),
values.get(1),
values.get(2),
message);
if(validUser==200 &&
validMessage==200 &&
validPassword==200){
this.synchronisedCheckedMap.synchronisedCheckedMap((short) 200, customer);
}else{
this.synchronisedCheckedMap.synchronisedCheckedMap(
getHighestCode(List.of(validUser,validID,validMessage,validPassword)), customer);
}
break;
case 600:
validUser = validUser(values.get(0));
validPassword = validPassword(values.get(1));
message = Arrays.asList(values.get(3).split(";"));
validMessage = validMesage(message);
customer =
new Customer(null,
values.get(1),
values.get(2),
message);
this.synchronisedCheckedMap.synchronisedCheckedMap((short) 600, customer);
break;
}
}catch (Exception e){
Thread.currentThread().interrupt(); // Preserve interruption status
return;
}
}
}
I also have the SyncroniseMap
@Component
public class SynchronisedCheckedMap {
public Map<Short, List<Customer>> synchronisedCheckedMap = new HashMap<>();
private SecurityCheck securityCheck;
public SynchronisedCheckedMap(SecurityCheck securityCheck) {
this.securityCheck = securityCheck;
}
public SynchronisedCheckedMap() {}
protected synchronized void synchronisedCheckedMap(Short code,
Customer customer){
if(code>=800){
this.securityCheck.stopPool(code,customer);
}
if(this.synchronisedCheckedMap.containsKey(code)){
this.synchronisedCheckedMap.get(code).add(customer);
}else {
List<Customer> list = new ArrayList<>();
list.add(customer);
synchronisedCheckedMap.put(code,list);
}
}
public void print(){
this.synchronisedCheckedMap.forEach((k,v)->{
System.out.println(k + "\t" + v);
});
}
}
I messed it up with the dependency injections in this class because it is not working at all. The threads are still running. But I tried to put a debugger on the "stopPool" method, and is not stopping there.
I know I will have to work more on those, but I want to stop the ExecutorService from the SecurityCheck class. I am trying to call the "stopPool" method when it is a specific code that was sent to the "this.synchronisedCheckedMap.synchronisedCheckedMap".
I have tried to do it in this way, but I need help to get it to work, because the threads keep going on.
How should I stop the ExecutorService properly in this case?
private ExecutorService pool;
Regarding that name, do not think of your executor service as a thread pool.
An executor service may be backed by a pool of threads. Or it may be backed by a single thread. Or it may be backed by no threads, with execution on current thread possible with its inherited execute
method. And in Java 21+, an executor service may create a fresh new virtual thread for each submitted task, with no pooling.
So think of an executor service as, well, a service that executes code, one way or another, on your behalf.
I have more classes, but I show only the essential classes for this case.
You showed too much code, without narrowing down to a focus on your particular issue. So I can give a few pointers and suggestions to guide you. But if you want a pointed direct Answer, you need to provide a pointed direct Question.
I am trying to check some records in a multithreading environment
When you have a bunch of tasks to execute, you can submit them all at once to your executor service via invokeAll
. Just a convenience, with same effect as submitting them one at a time.
It seems you simply have a list of lines of text to be processed individually. You seem to be trying to stop tasks by stopping the executor service, which is may be clumsy. Threads cannot be directly stopped. In Java, threads are stopped cooperatively, by designing your code to test for interrupt status or some other condition. Search to learn more.
Let's define a simple example task class.
If you want to return a result from each task, use Callable
rather than Runnable
. In this case, we return a Boolean
object to indicate whether our line successfully got processed.
package work.basil.example;
import java.time.Instant;
import java.util.concurrent.Callable;
public class LineProcessor implements Callable < Boolean > {
private final String line;
public LineProcessor ( final String line ) {
this.line = line;
}
@Override
public Boolean call ( ) {
System.out.println ( "In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " processing line: " + this.line );
Boolean success = Boolean.TRUE; // If we successfully processed this line.
return success;
}
}
ExecutorService
is now AutoCloseable
. So you can use try-with-resources syntax to automatically close the executor service when it finishes a batch of tasks.
Here we simulate getting a list of your lines. We send each to the constructor of our LineProcessor
task Callable
class. We pass those constructed task objects to our executor service. We get back a list of Future
objects. By using try-with-resources syntax, our code blocks until either the executor service has executed all tasks or has expired after a day.
List < String > lines = List.of ( "x" , "y" , "z" );
List < LineProcessor > lineProcessors = lines.stream ( ).map ( LineProcessor :: new ).toList ( );
List < Future < Boolean > > futures = null;
try (
ExecutorService executorService = Executors.newFixedThreadPool ( Runtime.getRuntime ( ).availableProcessors ( ) ) ;
) {
futures = executorService.invokeAll ( lineProcessors );
} catch ( InterruptedException e ) {
throw new RuntimeException ( e );
}
// At this point, the executor service has closed after all its tasks are done or canceled.
If we want to examine the results, we can add a bit more code to process the Future
objects.
for ( Future < Boolean > future : futures ) {
if ( future.isDone ( ) ) {
System.out.println ( "done" );
} else if ( future.isCancelled ( ) ) {
System.out.println ( "cancelled" );
}
System.out.println ( "Call future.get here to get result" );
}
System.out.println ( "INFO - Demo done. " + Instant.now ( ) );
When run:
In thread ID: 22 at 2023-11-01T05:56:56.499449Z processing line: y
In thread ID: 23 at 2023-11-01T05:56:56.499442Z processing line: z
In thread ID: 21 at 2023-11-01T05:56:56.499460Z processing line: x
done
Call future.get here to get result
done
Call future.get here to get result
done
Call future.get here to get result
INFO - Demo done. 2023-11-01T05:56:56.512539Z
Back to the main point of your Question, which seems to ask how to interrupt the sibling tasks if one of those tasks encounters a specific situation.
In Java, threads are interrupted cooperatively. You could ask the executor service to shutdown early, and it would try to interrupt each of its task’s threads. But interrupt handling is a bit tricky. I would suggest your set up a specific flag for all your tasks to share. This flag must be thread-safe. So I would use AtomicBoolean
as a flag among your tasks that all is well and they should continue their work. Let's initialize that flag, and pass it as an argument when constructing our task objects.
List < String > lines = List.of ( "x" , "y" , "z" );
AtomicBoolean flagToContinueProcessingLines = new AtomicBoolean ( true );
List < LineProcessor > lineProcessors =
lines
.stream ( )
.map ( ( String line ) -> new LineProcessor ( line , flagToContinueProcessingLines ) )
.toList ( );
List < Future < Boolean > > futures = null;
try (
ExecutorService executorService = Executors.newFixedThreadPool ( Runtime.getRuntime ( ).availableProcessors ( ) ) ;
) {
futures = executorService.invokeAll ( lineProcessors );
} catch ( InterruptedException e ) {
throw new RuntimeException ( e );
}
We must revise our task class LineProcessor
to take that AtomicBoolean
argument and store a reference internally.
public class LineProcessor implements Callable < Boolean > {
private final String line;
private final AtomicBoolean flagToContinueExecution;
public LineProcessor ( final String line , final AtomicBoolean flagToContinueExecution ) {
this.line = line;
this.flagToContinueExecution = flagToContinueExecution;
}
@Override
public Boolean call ( ) {
if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
System.out.println ( "In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " processing line: " + this.line );
if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
boolean badSituationArose = ThreadLocalRandom.current ( ).nextBoolean ( );
if ( badSituationArose ) {
System.out.println ( "OOPS! In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " funky situation arose while processing line: " + this.line );
this.flagToContinueExecution.set ( false ); // Flip the flag to signal the other sibling tasks to halt.
return Boolean.FALSE;
}
Boolean success = true; // If we successfully processed this line.
return success;
}
}
Notice how we dropped in multiple occurrences of if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
. You mast sprinkle such calls throughout your task code if you want to get that task to voluntarily halt its work when flagged by another task.
When run:
In thread ID: 21 at 2023-11-01T06:28:12.682969Z processing line: x
In thread ID: 23 at 2023-11-01T06:28:12.682936Z processing line: z
In thread ID: 22 at 2023-11-01T06:28:12.682935Z processing line: y
OOPS! In thread ID: 22 at 2023-11-01T06:28:12.695232Z funky situation arose while processing line: y
done
Call future.get here to get result
done
Call future.get here to get result
done
Call future.get here to get result
While not related to your problem, I would suggest using virtual threads in Java 21+ for greater throughput of tasks. Do so only if your tasks involve blocking, and are not entirely CPU-bound. Blocking comes with database calls, file storage calls, network calls, etc.
To be more compatible with current implementation of virtual threads, replace your use of synchronized
with a Reentrant
lock object for any long-running code executed within your tasks.
By the way, when doing something like this switch:
switch (validID) {
case 200: …
case 600: …
}
… always include a default
case to trap for unexpected values.