I have one thread writing to my concurrenthashmap, and another thread reading these blocks (not removing them).
A similiar question was given here that almost provides a solution, but doesn't quite apply to my situation.
I have this class:
public class BlockStore {
private ConcurrentHashMap<Integer, Long> blockToOffset = new ConcurrentHashMap<>();
private RandomAccessFile randomAccessFile;
public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
this.randomAccessFile = randomAccessFile;
randomAccessFile.setLength(fileLength);
}
public boolean hasBlock(int blockNumber) {
return blockToOffset.containsKey(blockNumber);
}
public void getBlock(int blockNumber, byte[] buffer) throws BlockNotFoundException, IOException {
if (hasBlock(blockNumber)) {
long offset = blockToOffset.get(blockNumber);
randomAccessFile.seek(offset);
randomAccessFile.readFully(buffer);
} else throw new BlockNotFoundException();
}
public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
randomAccessFile.seek(offset);
randomAccessFile.write(buffer);
blockToOffset.put(blockNumber, offset);
blockToOffset.notifyAll();
}
public boolean allFilesReceived() throws IOException {
double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
return expectedNumberOfBlocks == blockToOffset.size();
}
}
E.g. given the following situation
1: thread2.getBlock(0, emtpyBuff) // block doesn't exist so thread waits
2. thread1.writeBlock(0, buffWithData) // writes data block to file, adds block to hashmap.
3: thread2 is notified that block now exists, retrieves offset from hashmap, writes corresponding file block to emptyBuff, and goes on to do stuff.
The main difference between the this and the other question is that I'm not wanting to return the value of the hashmap directly, but perform further operations on it within the blockstore class to fetch the actual file data.
Thanks! :)
EDIT:
I've considered simply polling the blockstore from thread2 until a return value is given, but that results in a lot of unecessary CPU usage.
It seems there's not a 'nice' solution for this problem, and so I adapted the referenced question to match my own. This is only a safe solution when one thread is reading from the blocking queue as the poll then return value in getBlock() is not thread safe.
public class BlockStore {
private ConcurrentHashMap<Integer, BlockingQueue<Long>> blockToOffset = new ConcurrentHashMap<>();
private RandomAccessFile randomAccessFile;
public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
this.randomAccessFile = randomAccessFile;
randomAccessFile.setLength(fileLength);
}
private synchronized BlockingQueue<Long> ensureQueueExists(int key) {
if (blockToOffset.containsKey(key)) {
return blockToOffset.get(key);
} else {
BlockingQueue<Long> queue = new ArrayBlockingQueue<>(1);
blockToOffset.put(key, queue);
return queue;
}
}
public void getBlock(int blockNumber, byte[] buffer) {
BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
try {
long offset = queue.poll(60L, TimeUnit.SECONDS);
queue.add(offset); // Put offset back into queue. Since get will only be called by one thread, this does not result in a race condition
randomAccessFile.seek(offset);
randomAccessFile.readFully(buffer);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
randomAccessFile.seek(offset);
randomAccessFile.write(buffer);
queue.add(offset);
blockToOffset.put(blockNumber, queue);
}
public boolean allFilesReceived() throws IOException {
double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
return expectedNumberOfBlocks == blockToOffset.size();
}
}