Search code examples
javaarraylistconcurrencyblockingqueuesynchronizedcollection

Obtaining a Synchronized List of BlockingQueues in Java


I'd like to work with the following data structure:

List<BlockingQueue<AtomicInteger>> listOfQueues = 
    Collections.synchronizedList(
        new ArrayList<ArrayBlockingQueue<AtomicInteger>>(15)
    );

So I'd like to construct a list of initially empty BlockingQueues in such a way that every single BlockingQueue shall encapsulate AtomicIntegers. I know that BlockingQueue is an inteface that shall be implemented by for example ArrayBlockingQueue.

It's also important to obtain a synchonizedList of 15 elements.

I printed out the size of the synchonizedList and it yielded 0.

How to fix the aforementioned issues?


Solution

  • By specifying the capacity argument of the ArrayList's constructor, you're changing the size of it's underlying array. It has no impact on the contents of the list. You need to add the queues manually.

    Also note that some implementations of the BlockingQueue like ArrayBlockingQueue require specifying the initial capacity of the queue in the constructor.

    Here's how it can be done with Stream API by using Collector collectingAndThen() with Collector toList() as a downstream and a function that produces asynchronized List.

    final int queueCapacity = // the required capacity of the queues
    final int limit = // required number of queues
        
    var listOfQueues = 
        Stream.generate(() -> new ArrayBlockingQueue<AtomicInteger>(capacity))
            .limit(limit)
            .collect(Collectors.collectingAndThen(
                Collectors.toList(),
                Collections::synchronizedList
            ));
    

    Or you can do it using a plane for-loop like that:

    final int capacity = // the required capacity of the queues
    final int limit = // required number of queues
        
    List<ArrayBlockingQueue<AtomicInteger>> listOfQueues = new ArrayList<>();
    for (int i = 0; i < limit; i++) {
        listOfQueues.add(new ArrayBlockingQueue<>(capacity));
    }
            
    var synchronizedListOfQueues = Collections.synchronizedList(listOfQueues);