I have recently added a new worker thread which drains a queue into a different tables, then exits. While running, i'm seeing a lot of the follow log message;
2021-05-16 11:25:19.496 WARN 18 --- [Thread-1] com.hazelcast.spi.EventService : [127.0.0.1]:5701 [dev] [3.12.6] EventQueue overloaded! TopicEvent{name='default-update-timestamps-region', publishTime=1621164319495, publisherAddress=[172.18.0.4]:5701} failed to publish to hz:impl:topicService:default-update-timestamps-region
The table neither of the entities that I am reading from/writing to are cached, so i'm wondering why the cache is being refreshed at all on this thread, let alone how its blowing the limits of this EventQueue?
I have not changed the configured defaults (using Hazelcast 3.12.6), so i'm confused how this could hydrate this cache so quickly?
See below for rough pseudocode for my new service;
private void processForever() {
threadRef = Thread.currentThread();
synchronized (syncObject) {
//notify init that we're good to continue
syncObject.notifyAll();
}
while (threadRef == Thread.currentThread()) {
boolean foundWork = false;
try {
foundWork = process();
} catch (Exception e) {
log.debug("stack", e);
}
long sleep = foundWork ? 1000 : 60000;
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
}
}
}
private boolean process() {
try {
// N.B this attempts to grab a shared lock on the current tenant and skips of already taken
return dataGrid.runExclusiveForCurrentTenantOrSkip(LockName.PROCESS, this::processInternal).orElse(true);
} catch (Exception ex) {
log.error("error", ex);
return true;
}
}
private boolean processInternal() {
Long maxSid = sourceQueueRepo.findMaxSid();
if (maxSid == null) {
return false;
}
Set<Worker> agents = workerRepo.findAllWorkers();
queueWork(maxSid, agents);
return true;
}
public void queueWork(Long maxId, Set<Worker> workers) {
sourceQueueRepo.dedupeByMaxSid(maxId);
List<SourceQueue> batch = sourceQueueRepo.findAllBySidLessThanEqual(maxId);
Map<Long, List<SourceQueue>> batched = // Redacted
for (Worker worker : workers) {
// Method 'batchInsert' calls a save query (transactional)
batchInsert(worker, batched.getOrDefault(Type.TYPE_1, new HashMap<>()));
batchInsert(worker, batched.getOrDefault(Type.TYPE_2, new HashMap<>()));
batchInsert(worker, batched.getOrDefault(Type.TYPE_3, new HashMap<>()));
}
sourceQueueRepo.deleteByMaxId(maxId);
}
N.B.
I tracked down the issue to be the use of HazelcastLocalCacheRegionFactory, my new thread was under a lot of stress and the cache region was flooding the EventQueue as it propagated it's events.
I opted not to switch to use HazelcastCacheRegionFactory as for my scenario this had an unacceptable impact on performance.
Instead, for the @Modifying
queries I chose to use the NativeQuery object and specify the cache item I was invalidating via addSynchronizedEntityClass
.
E.g;
return em.createNativeQuery("DELETE FROM dbo.SourceQueue WHERE id <= :id")
.unwrap(NativeQuery.class)
.setParameter("id", id)
.addSynchronizedEntityClass(SourceQueue.class)
.executeUpdate();