Search code examples
javaandroidlockingqueueblocking

Custom blocking queue locking issue


I am trying to do some custom implementation of blocking queue with fixed length array of byte arrays. I am not removing polled elements, therefore I adjusted put method to return byte array so that it can be written directly (producer thread uses MappedByteBuffer to write directly to this byte array). I added "commitPut()" method to simply increase counters and set "lengths" arrays. (if multiple threads would be writing this could be the concurrency problems, but I know that only one thread is writing).

Below is what I currently have. It works if I debug through step by step, but if I "run" it looks like it encounters some locking problems. I copied, stripped down and adjusted ArrayBlockingQueue code. Can someone with better knowledge please look at the class and tell me what I am doing wrong, or how to do it better (like write directy to buffer and set lengths array and counters at the same step)?

public class ByteArrayBlockingQueue {

    private final int[] lens; // array to valid lengths
    private final byte[][] items; // array of byte arrays

    private int takeIndex = 0;
    private int putIndex = 0;
    private int count = 0;

    public volatile int polledLen = 0; // lenght of last polled byte array

    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    final int inc(int i) {
        return (++i == items.length)? 0 : i;
    }

    public ByteArrayBlockingQueue(int capacity, int size, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new byte[capacity][size];
        this.lens = new int[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull  = lock.newCondition();
    }

    public byte[] put() throws InterruptedException {
        final byte[][] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();

            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            //insert(e, len);
            return items[putIndex];
        } finally {
            lock.unlock();
        }
    }

    public void commitPut(int lenBuf) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            lens[putIndex] = lenBuf;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public byte[] poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            final byte[][] items = this.items;
            final int[] lens = this.lens;
            byte[] e = items[takeIndex];
            this.polledLen = lens[takeIndex];
            //items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return e;

        } finally {
            lock.unlock();
        }
    }
}

Solution

  • If the queue wraps around, it is possible that byte arrays get reused and overwritten before having been read by consumers. In short, you'd need to have a commitGet method to make sure that producers wait for consumers before overwriting an array with new data.

    However, my suggestion is that you rely on java.util.concurrent.BlockingQueue having a second queue to return them from consumers back to the producer, and on java.nio.ByteByffer to keep track of lengths. The producer would do as follows:

    ByteBuffer buffer = bufferQueue.poll(); // instead of your put()
    buffer.put(source);                     // fill buffer from source MappedByteBuffer
    buffer.flip();                          // set length to the amount written
    dataQueue.offer(buffer);                // instead of commitPut()
    

    The consumer would:

    ByteBuffer buffer = dataQueue.poll();   // instead of your get()
    buffer.get(...);                        // use data
    buffer.clear();                         // reset length               
    bufferQueue.offer(buffer);              // this is the missing commitGet()
    

    You should initially insert capacity elements in freeQueue. Note however that this will still copy data once from the source buffer into temporary buffers in the queues, as your original code already did.

    If you really don't want to copy data (and are sure that the source does not change until all consumers have read it!), your better option is to use a single blocking queue and insert buffers obtained with ByteBuffer.slice() from your source buffer for each chunk of data to be handed down to consumers. These will then be garbage collected, but should take much less memory than the byte arrays themselves.