Search code examples
javamultithreadingwriterreader

Many writers one reader without concurrency


I'm programming in java, and I have a List<LogEntry> logwhich is shared between different threads.

These "writers" threads are already synchronized between them, so only one thread at time can add or remove elements from log

However, because of the distributed algorithm that I'm trying to implement, there are portion of log which are "safe", which means that they cannot be modified neither by the writers or by the reader (which I introduce below) . This portion of log is indicated by the field int committedIndex, which is initialized to 0 and increase monotonically.

In conclusion, writers modify the elements in log in the range (commitIndex,log.size()), while there is a reader which get the elements in log contained in the range [0,commitIndex]. The reader starts to read from the first entry, then read the next one until he reaches log.get(commitIndex), then it stop and goes to sleep until commitIndex is increased. It updates a field lastApplied which is initialized to 0 and increases monotonically in order to remember the last logEntry that he read before going to sleep.

As you can see there is no need to synchronize the reader and the writers, since they access different portions of log.

My question is: how can I "wake up" the reader's thread when commitIndex is increased? I'd need something like this (executed by a writer):

if(commitIndex is updated)
{
     //wake up reader
}

And the reader:

public void run() {
    while(true){
        //go to sleeep...
        //now the reader is awaken!
        while(lastApplied<commitIndex){
            //do something with log.get(lastApplied)
            lastApplied++;
        }
    }

Obviously I terribly simplified my code in order to make you understand what I want as better as possible, I'm sorry if it's not clear enough (and don't hesitate to ask me anything about it). Thanks!


Solution

  • Use a shared LinkedBlockingQueue<Integer> (among the readers and all writers) to let every writer signal the reader that the commitIndex variable has been modified:

    Writers:

    if (commitIndex is updated) {
        // wake up reader
        this.queue.add(commitIndex);
    }
    

    Reader:

    public void run() {
        while (true) {
    
            // take() puts this thread to sleep until a writer calls add()
            int commitIndex = this.queue.take();
    
            // now the reader is awaken!
            while (lastApplied < commitIndex) {
                // do something with log.get(lastApplied)
                lastApplied++;
            }
        }
    }
    

    Here I've used the attribute queue, which should correspond to the same instance of the LinkedBlockingQueue, for the reader and all writers.

    Note: Exception handling left as an excercise.