Search code examples
javagarbage-collectionblockingqueue

How to poll() and remove item from ArrayBlockingQueue without memory allocation


I am trying to poll an ArrayBlockingQueue to get the item:

    public void run() {
        while (true) {
            if (Thread.currentThread().isInterrupted()){
                closeAppender();
                break;
            }

            if (!TASK_BUFFER.isEmpty() && !TASK_ADDRESS.isEmpty() && !TASK_MARKER.isEmpty()){
                buffer = TASK_BUFFER.poll();
                remoteAdd = TASK_ADDRESS.poll();
                marker = TASK_MARKER.poll();

//                if (buffer.hasArray()){  //if byte buffer is used just get array
//                    byteArr = buffer.array();
//                }else{
//                    byteArr = directArr;  //if direct buffer is used, we get into direct arr
////                    Arrays.fill(byteArr, (byte)0);  //reset every value to "null" value
//                    buffer.get(byteArr, 0, buffer.remaining());
//                }
//                remaining = buffer.remaining();
//                runTask();
                BUFFER_POOL.returnBuffer(buffer);
            }
        }
        LOGGER.info(mainMarker,"Task Scheduler Thread has terminated...");
    }

However, when I run jvisualvm, I can see that whenever I "schedule" a task and poll the queue, it is allocating bytes. I have tried commenting and uncommenting the other parts of the code but only the polling and scheduling will affect the allocated bytes.

jvisualvm

I have checked this link but I am not looking to iterate the whole array. I am looking to get one item at a time to process then remove it. What is the proper way to approach this?

edit: I have a separate thread that "produces" instances of ByteBuffer, InetSocketAddress and Marker and puts it inside the arrayblockingqueues. I also just checked the source code for poll() and dequeue() it doesn't seem that it does anything that should warrant the allocation of bytes. If I am creating the objects in the main thread, and adding it to queue, it shouldnt allocate bytes if this thread is only used for processing the queue values

The constructor looks like this:

private final BlockingQueue<ByteBuffer> TASK_BUFFER;
private final BlockingQueue<InetSocketAddress> TASK_ADDRESS;
private final BlockingQueue<Marker> TASK_MARKER;
private final BufferPool BUFFER_POOL;
private final ExcerptAppender APPENDER;

public TaskScheduler(BufferPool bufferPool, ChronicleQueue chronicleQueue){
    TASK_BUFFER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
    TASK_ADDRESS = new ArrayBlockingQueue<>(bufferPool.getCapacity());
    TASK_MARKER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
    BUFFER_POOL = bufferPool;
    APPENDER = chronicleQueue.acquireAppender();
}

and this puts the produced values inside the queue:

    public void scheduleTask(final ByteBuffer buffer, final InetSocketAddress remoteAdd, final Marker marker) throws InterruptedException {
        TASK_BUFFER.put(buffer);
        TASK_ADDRESS.put(remoteAdd);
        TASK_MARKER.put(marker);
    }

Solution

  • Whenever an attempt to acquire a ReentrantLock is made, there’s potentiall a memory allocation. When there’s no contention or the lock is released during the spinning phase, the lock() method will return without allocations. But when the thread is added to the queue of waiting threads, a node object is allocated for that.

    This applies to all classes using a ReentrantLock under the hood, like ArrayBlockingQueue, but also to all concurrency classes using AbstractQueued[Long]Synchronizer behind the scenes in general.

    So your code looks like an attempted premature optimization that backfired. To avoid creating a simple object holding the ByteBuffer, InetSocketAddress, and Marker belonging to a task, you have three blocking queues requiring not only three poll calls but also to protect them with three isEmpty() checks, so you end up with up to six allocations in the worst case, instead of the one you had with a single poll() and null check when you used a single queue of objects holding all three values.

    But the entire loop is literally a polling loop, repeatedly calling poll() without ever giving up the CPU. It’s creating the very overhead it (apparently) was supposed to avoid. Just calling take(), to only return when there is a new element would avoid this. This would queue the thread if there is no new element, but as said, a thread gets queued anyway when there’s contention at the lock and repeatedly calling poll() in a tight loop is a recipe to get contention sooner or later. On the other hand, take() will work exactly the same as poll() when there is a new element.