Search code examples
hazelcast

Can you access a Hazelcast Queue from within an ItemListener?


I have a use case where I have a set of items, DiagnosticRuns, that are submitted to my cluster. I want to process them serially (to avoid conflicts). I am trying to use a Hazelcast Queue protected by a Lock to make sure the items are processed one at a time. Hazelcast is running in embedded mode in my cluster. If I register an ItemListener with the Queue, is it safe to call take() on the Queue from within the itemAdded() method? For example:

@Component      
public class DistributedQueueListener
{
    public static final String DIAGNOSTICS_RUN_QUEUE_NAME = "diagnosticRun";

    @Autowired
    private HazelcastInstance hazelcast;

    @Autowired
    private ProductVersioningService productVersioningService;

    private IQueue<DiagnosticRun> diagnosticRunQueue;
    private ILock diagnosticRunLock;
    private String diagnosticRunListenerId;

    @PostConstruct
    public void init()
    {
        diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTICS_RUN_QUEUE_NAME);
        diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
        diagnosticRunListenerId = diagnosticRunQueue.addItemListener(new DiagnosticRunListener(), false);
    }

    @PreDestroy
    public void stop()
    {
        diagnosticRunQueue.removeItemListener(diagnosticRunListenerId);
    }

    public class DiagnosticRunListener implements ItemListener<DiagnosticRun>
    {
        @Override
        public void itemAdded(ItemEvent<diagnosticRun> item)
        {
            diagnosticRunLock.lock(5, TimeUnit.SECONDS);
            try
            {
                DiagnosticRun diagnosticRun = diagnosticRunQueue.poll();
                if(diagnosticRun != null)
                {
                    productVersioningService.updateProductDeviceTable(diagnosticRun);
                }
            }
            finally
            {
                diagnosticRunLock.unlock();
            }
        }

        @Override
        public void itemRemoved(ItemEvent<diagnosticRun> item)
        {
        }   
    }       
}

I'm not sure whether it's threadsafe to call take() on the Queue from that location and thread.

If that is not allowed, I'll have to set up my own long-running loop to poll() the Queue. I'm not sure what's the best way to set up a long-running thread in a Spring Boot application. Assuming the method above does not work, would the below code be threadsafe? Or is there a better way to do this?

@Component      
public class DistributedQueueListener
{
    public static final String DIAGNOSTIC_RUN_QUEUE_NAME = "diagnosticRun";

    @Autowired
    private HazelcastInstance hazelcast;

    @Autowired
    private ProductVersioningService productVersioningService;

    private IQueue<diagnosticRun> diagnosticRunQueue;
    private ILock diagnosticRunLock;

    private ExecutorService executorService;

    @PostConstruct
    public void init()
    {
        diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTIC_RUN_QUEUE_NAME);
        diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");

        executorService = Executors.newFixedThreadPool(1);
        executorService.submit(() -> listenToDiagnosticRuns());
    }

    @PreDestroy
    public void stop()
    {
        executorService.shutdown();
    }

    private void listenToDiagnosticRuns()
    {
        while(!executorService.isShutdown())
        {
            diagnosticRunLock.lock(5, TimeUnit.SECONDS);
            try
            {
                DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
                productVersioningService.updateProductDeviceTable(diagnosticRun);
            }
            catch(InterruptedException e)
            {
                logger.error("Interrupted polling diagnosticRun queue", e);
            }
            finally
            {
                diagnosticRunLock.unlock();
            }
        }
    }
}

Solution

  • First I'll qualify that I'm not exactly an expert on which threads these are executed on and when so some may disagree but here're my thoughts on this so anyone please chime in as this looks to be an interesting case. Your first solution mixes the Hazelcast event threading with it's operation threading. In fact you're triggering three operations to be invoked as a result of the single event. If you put some arbitrary latency in your call to updateProcductDeviceTable, you'll see that eventually, it will slow down but resume up again after some time. This will cause your local event queue to pile up while operations are invoked. You could put everything you're doing in a separate thread which you can "wake" up on #itemAdded or if you can afford to have a bit of latency, do what you're doing on your second solution. I would, however, make a couple changes in

    listenToDiagnosticsRuns() method:

    private void listenToDiagnosticRuns()
    {
        while(!executorService.isShutdown())
        {
            if(diagnosticRunQueue.peek() != null)
           {
               diagnosticRunLock.lock(5, TimeUnit.SECONDS);
               try
               {
                   DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
                   if(diagnosticRun != null)
                   {
                       productVersioningService.updateProductDeviceTable(diagnosticRun);
                   }
               }
               catch(InterruptedException e)
               {
                   logger.error("Interrupted polling diagnosticRun queue", e);
               }
               finally
               {
                   diagnosticRunLock.unlock();
               }
            } // peek != null
            else
            {
                try
                {
                    Thread.sleep(5000);
                }
                catch (InterruptedException e)
                {
                     //do nothing
                }
            }
        }
    }