Search code examples
javamultithreadinghazelcast

Hazelcast - QueueListener - multithreading or MDB


I'm using hazel cast IMGD for my app. I have used queues for internal communication. I added an item listener to queue and it works great. Whenever a queue gets a message, listener wakes up and needed processing is done.

Problem is its single threaded. Sometimes, a message takes 30 seconds to process and messages in queue just have to wait until previous message is done processing. I'm told to use Java executor service to have a pool of threads and add an item listener to every thread so that multiple messages can be processed at same time.

Is there any better way to do it ? may be configure some kind of MDB or make the processing asynchronous so that my listener can process the messages faster

@PostConstruct
public void init() {
    logger.info(LogFormatter.format(BG_GUID, "Starting up GridMapper Queue reader"));

    HazelcastInstance hazelcastInstance = dc.getInstance();
    queue = hazelcastInstance.getQueue(FactoryConstants.QUEUE_GRIDMAPPER);
    queue.addItemListener(new Listener(), true);

}

    class Listener implements ItemListener<QueueMessage> {

        @Override
        public void itemAdded(ItemEvent<QueueMessage> item) {
            try {
                QueueMessage message = queue.take();

                processor.process(message.getJobId());
            } catch (Exception ex) {
                logger.error(LogFormatter.format(BG_GUID, ex));
            }

        }

        @Override
        public void itemRemoved(ItemEvent<QueueMessage> item) {
            logger.info("Item removed: " + item.getItem().getJobId());
        }

    }

Solution

  • Hazelcast IQueue does not support asynchronous interface. Anyway, asynchronous access would not be faster. MDB requires JMS, which is pure overhead. What you really need is multithreaded executor. You can use default executor:

        private final ExecutorService execService = ForkJoinPool.commonPool();