I'm facing a problem with what the title suggests.
I have a @Component
that monitors "Processes". Once the first Process is registered in the component, I initialize my ScheduledExecutorService
:
ScheduledExecutorService pollingThread = Executors.newSingleThreadScheduledExecutor();
And with pollingThread.scheduleAtFixedRate(monitorProcesses, 60, 60, TimeUnit.SECONDS)
I set up the Runnable
"monitorProcesses" to be executed every 60 seconds, where I basically poll a third party API and loop each registered Process in my component to check whether they have finished or not.
If a process finishes, I want a new Thread to do some logic without blocking the "polling" Thread. Here is a code snippet.
/**
* Starts the monitoring of the processes.
*/
private void startMonitoring() {
pollingThread = Executors.newSingleThreadScheduledExecutor();
Runnable monitorProcesses = () -> {
// 1. Get processes queue
Map<String, ProcessStatus> queue;
do {
try {
queue = getProcessesStatusQueue(); // API request
} catch (Exception e) {
logger.warn("ProcessMonitor >> Unable to get processes queue. Trying again in the next cycle.");
return;
}
} while (queue == null);
// 2. Loop registered processes and check for changes in their status
for (Process process : getRegisteredProcesses()) {
try {
ProcessStatus processStatus = queue.get(process.getId());
syncProcess(process, processStatus);
} catch (Exception e) {
onUnexpectedError(process, e);
}
}
// 3. Check if all processes finished
if (getRegisteredProcesses().isEmpty()) { // synchronized method
stopMonitoring(); // executes pollingThread.shutDown() and some more stuff
}
};
// Start polling
pollingThread.scheduleAtFixedRate(monitorProcesses, 60, 60, TimeUnit.SECONDS);
isMonitoringActive = true;
}
private void syncProcess(
@NotNull Process process, ProcessStatus processStatus) {
//
// some logic
//
boolean condition = checkProcessEnded(process, processStatus);
if (condition) {
new Thread(() -> processService.onProcessesEnded(process)).start();
}
}
The problem I'm facing is when a first Process process
finishes, and therefore a new Thread is created to execute processService.onProcessesEnded(process)
, right afterwards my main loop 2. doesn't continue and the "polling thread" stops. So for example, if I have 10 processes, and the first one ends, the 9 remaining processes are never "processed" (forgive the redundancy).
I don't see any exceptions being thrown in my logs.
Any help would be highly appreciated!
So, looks like I've solved my concurrency issue. It wasn't about the Thread creation. The problem occurred when processes were registered while the polling thread was iterating over the HashMap
where I was storing my registered processes.
You either have to use a ConcurrentHashMap
, which is a Map
adapted for concurrency and can handle insertions while iterating over it, or synchronize the loop block and the methods where you register the elements of the map, so no elements are inserted in the HashMap while it's being iterated over:
Synchronize example with HashMap
Map<String, Process> registeredProcesses = new HashMap<>();
private void startMonitoring() {
...
...
// Lock map
synchronize (registeredProcesses) {
// 2. Loop registered processes and check for changes in their status
for (Process process : registeredProcesses.values()) {
try {
ProcessStatus processStatus = queue.get(process.getId());
syncProcess(process, processStatus);
} catch (Exception e) {
onUnexpectedError(process, e);
}
}
}
...
...
}
private void register(Process process) {
synchronize (registeredProcesses) {
registeredProcesses.put(process.getId(), process);
}
}
I've chosen the ConcurrentHashMap
option, because I don't want the threads that register processes to block.