I am trying to do some custom implementation of blocking queue with fixed length array of byte arrays. I am not removing polled elements, therefore I adjusted put method to return byte array so that it can be written directly (producer thread uses MappedByteBuffer to write directly to this byte array). I added "commitPut()" method to simply increase counters and set "lengths" arrays. (if multiple threads would be writing this could be the concurrency problems, but I know that only one thread is writing).
Below is what I currently have. It works if I debug through step by step, but if I "run" it looks like it encounters some locking problems. I copied, stripped down and adjusted ArrayBlockingQueue code. Can someone with better knowledge please look at the class and tell me what I am doing wrong, or how to do it better (like write directy to buffer and set lengths array and counters at the same step)?
public class ByteArrayBlockingQueue {
private final int[] lens; // array to valid lengths
private final byte[][] items; // array of byte arrays
private int takeIndex = 0;
private int putIndex = 0;
private int count = 0;
public volatile int polledLen = 0; // lenght of last polled byte array
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
public ByteArrayBlockingQueue(int capacity, int size, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new byte[capacity][size];
this.lens = new int[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public byte[] put() throws InterruptedException {
final byte[][] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
//insert(e, len);
return items[putIndex];
} finally {
lock.unlock();
}
}
public void commitPut(int lenBuf) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
lens[putIndex] = lenBuf;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public byte[] poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
final byte[][] items = this.items;
final int[] lens = this.lens;
byte[] e = items[takeIndex];
this.polledLen = lens[takeIndex];
//items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}
If the queue wraps around, it is possible that byte arrays get reused and overwritten before having been read by consumers. In short, you'd need to have a commitGet
method to make sure that producers wait for consumers before overwriting an array with new data.
However, my suggestion is that you rely on java.util.concurrent.BlockingQueue having a second queue to return them from consumers back to the producer, and on java.nio.ByteByffer to keep track of lengths. The producer would do as follows:
ByteBuffer buffer = bufferQueue.poll(); // instead of your put()
buffer.put(source); // fill buffer from source MappedByteBuffer
buffer.flip(); // set length to the amount written
dataQueue.offer(buffer); // instead of commitPut()
The consumer would:
ByteBuffer buffer = dataQueue.poll(); // instead of your get()
buffer.get(...); // use data
buffer.clear(); // reset length
bufferQueue.offer(buffer); // this is the missing commitGet()
You should initially insert capacity
elements in freeQueue
. Note however that this will still copy data once from the source
buffer into temporary buffers in the queues, as your original code already did.
If you really don't want to copy data (and are sure that the source does not change until all consumers have read it!), your better option is to use a single blocking queue and insert buffers obtained with ByteBuffer.slice() from your source buffer for each chunk of data to be handed down to consumers. These will then be garbage collected, but should take much less memory than the byte arrays themselves.