Search code examples
javaconcurrencysetactivemq-classiccopyonwritearraylist

Properly iterating over queues from ActiveMQ DestinationSource.getQueues response


For some reason in the following code, destinationSource.getQueues() is returning a CopyOnWriteArraySet instead of a simple Set. This is a problem because the for loop begins to process before the Set is full and due to the nature of CopyOnWriteArraySet it will only process the items in the Set before the loop. I know I can throw a Thread.sleep() in there but that doesn't fix the underlying problem. Is there any reason it would be returned as a CopyOnWriteArraySet instead of a Set? Also is there any way to iterate over a CopyOnWriteArraySet to ensure all items would be covered, even ones added during the iteration?

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
activeMQConnection.start();
DestinationSource destinationSource = activeMQConnection.getDestinationSource();

Set<ActiveMQQueue> queues = destinationSource.getQueues();

for(ActiveMQQueue queue : queues) {
  queueNames.add(queue.getPhysicalName());
}

activeMQConnection.close()

Edit: Here is the solution I came up with, while its not perfect it ensures that you will get all the queues up until there is more than 1 second between queues being added.

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

    ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();

    activeMQConnection.start();

    DestinationSource destinationSource = activeMQConnection.getDestinationSource();

    Set<ActiveMQQueue> queues = destinationSource.getQueues();

    do {
        for(ActiveMQQueue queue : queues) {
            String physcialName = queue.getPhysicalName();
            if(!queueNames.contains(physcialName)) {
                queueNames.add(physcialName);
            }
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log(e.toString());
        }
    }while(queueNames.size() < queues.size());

    activeMQConnection.close();

Solution

  • I had the same issue with getting all queues from a connection. Whenever i got the queues from the DestinationSource and then iterated afterwards (foreach) over this set, i got different number of queues (In the iteration loop i always get more queues than in the initial set).

    DestinationSource ds = connection.getDestinationSource();
    Set<ActiveMQQueue> queues = ds.getQueues();
    log.debug("Found '" + queues.size() + "' queues");
    for (ActiveMQQueue queue : queues) {...}
    

    Then, i added a listener to the destination source like this

    DestinationSource ds = connection.getDestinationSource();
    Set<ActiveMQQueue> queues = ds.getQueues();
    // Add listener:
    ds.setDestinationListener(event -> event.hashCode());
    log.debug("Found '" + queues.size() + "' queues");
    for (ActiveMQQueue queue : queues) {...}
    

    From now on, i always get the right number of queues and can iterate over the complete set.

    Allthough, i don't really know why ;)