I'm programming in java, and I have a List<LogEntry> log
which is shared between different threads.
These "writers" threads are already synchronized between them, so only one thread at time can add or remove elements from log
However, because of the distributed algorithm that I'm trying to implement, there are portion of log which are "safe", which means that they cannot be modified neither by the writers or by the reader (which I introduce below) . This portion of log
is indicated by the field int committedIndex
, which is initialized to 0 and increase monotonically.
In conclusion, writers modify the elements in log
in the range (commitIndex,log.size())
, while there is a reader which get the elements in log
contained in the range [0,commitIndex]
. The reader starts to read from the first entry, then read the next one until he reaches log.get(commitIndex)
, then it stop and goes to sleep until commitIndex
is increased. It updates a field lastApplied
which is initialized to 0 and increases monotonically in order to remember the last logEntry
that he read before going to sleep.
As you can see there is no need to synchronize the reader and the writers, since they access different portions of log
.
My question is: how can I "wake up" the reader's thread when commitIndex
is increased? I'd need something like this (executed by a writer):
if(commitIndex is updated)
{
//wake up reader
}
And the reader:
public void run() {
while(true){
//go to sleeep...
//now the reader is awaken!
while(lastApplied<commitIndex){
//do something with log.get(lastApplied)
lastApplied++;
}
}
Obviously I terribly simplified my code in order to make you understand what I want as better as possible, I'm sorry if it's not clear enough (and don't hesitate to ask me anything about it). Thanks!
Use a shared LinkedBlockingQueue<Integer>
(among the readers and all writers) to let every writer signal the reader that the commitIndex
variable has been modified:
Writers:
if (commitIndex is updated) {
// wake up reader
this.queue.add(commitIndex);
}
Reader:
public void run() {
while (true) {
// take() puts this thread to sleep until a writer calls add()
int commitIndex = this.queue.take();
// now the reader is awaken!
while (lastApplied < commitIndex) {
// do something with log.get(lastApplied)
lastApplied++;
}
}
}
Here I've used the attribute queue
, which should correspond to the same instance of the LinkedBlockingQueue
, for the reader and all writers.
Note: Exception handling left as an excercise.