Search code examples
hibernatehazelcast

Hazelcast EventQueue overloaded updating cache region default-update-timestamps-region


I have recently added a new worker thread which drains a queue into a different tables, then exits. While running, i'm seeing a lot of the follow log message;

2021-05-16 11:25:19.496  WARN 18 --- [Thread-1] com.hazelcast.spi.EventService           : [127.0.0.1]:5701 [dev] [3.12.6] EventQueue overloaded! TopicEvent{name='default-update-timestamps-region', publishTime=1621164319495, publisherAddress=[172.18.0.4]:5701} failed to publish to hz:impl:topicService:default-update-timestamps-region

The table neither of the entities that I am reading from/writing to are cached, so i'm wondering why the cache is being refreshed at all on this thread, let alone how its blowing the limits of this EventQueue?

I have not changed the configured defaults (using Hazelcast 3.12.6), so i'm confused how this could hydrate this cache so quickly?

See below for rough pseudocode for my new service;

private void processForever() {
    threadRef = Thread.currentThread();
    synchronized (syncObject) {
        //notify init that we're good to continue
        syncObject.notifyAll();
    }
    while (threadRef == Thread.currentThread()) {
        boolean foundWork = false;
        try {
            foundWork = process();
        } catch (Exception e) {
            log.debug("stack", e);
        }

        long sleep = foundWork ? 1000 : 60000;
        try {
            Thread.sleep(sleep);
        } catch (InterruptedException e) {

        }
    }
}

private boolean process() {
    try {
        // N.B this attempts to grab a shared lock on the current tenant and skips of already taken
        return dataGrid.runExclusiveForCurrentTenantOrSkip(LockName.PROCESS, this::processInternal).orElse(true);
    } catch (Exception ex) {
        log.error("error", ex);
        return true;
    }
}

private boolean processInternal() {
    Long maxSid = sourceQueueRepo.findMaxSid();
    if (maxSid == null) {
        return false;
    }

    Set<Worker> agents = workerRepo.findAllWorkers();
    queueWork(maxSid, agents);

    return true;
}

public void queueWork(Long maxId, Set<Worker> workers) {

    sourceQueueRepo.dedupeByMaxSid(maxId);

    List<SourceQueue> batch = sourceQueueRepo.findAllBySidLessThanEqual(maxId);
    Map<Long, List<SourceQueue>> batched = // Redacted

    for (Worker worker : workers) {
        // Method 'batchInsert' calls a save query (transactional)
        batchInsert(worker, batched.getOrDefault(Type.TYPE_1, new HashMap<>()));
        batchInsert(worker, batched.getOrDefault(Type.TYPE_2, new HashMap<>()));
        batchInsert(worker, batched.getOrDefault(Type.TYPE_3, new HashMap<>()));
    }


    sourceQueueRepo.deleteByMaxId(maxId);
}

N.B.

  • Each query is transactional with the goal of keeping db transactions short as there would be contention from other threads on the destination table.
  • The code that inserts into this queue calls interrupt on this new thread, to ensure that its draining the new queue. There are multiple threads calling this so the down time under heavy load is very infrequent.

Solution

  • I tracked down the issue to be the use of HazelcastLocalCacheRegionFactory, my new thread was under a lot of stress and the cache region was flooding the EventQueue as it propagated it's events.

    I opted not to switch to use HazelcastCacheRegionFactory as for my scenario this had an unacceptable impact on performance.

    Instead, for the @Modifying queries I chose to use the NativeQuery object and specify the cache item I was invalidating via addSynchronizedEntityClass.

    E.g;

    
    return em.createNativeQuery("DELETE FROM dbo.SourceQueue WHERE id <= :id")
        .unwrap(NativeQuery.class)
        .setParameter("id", id)
        .addSynchronizedEntityClass(SourceQueue.class)
        .executeUpdate();