Search code examples
javaconcurrencyreentrantreadwritelock

Fair ReadWriteLock with Conditions


First of all I checked previous questions about this topic, however none fits my specific problem.

I got the following Code that illustrates sensors with a timestamp and data stored in a double array, additionally an instance of my implemented FairRWLock.

class LockedSensors implements Sensors {
    long time = 0;
    double data[];
    FairRWLock lock = new FairRWLock();

    LockedSensors() {
        time = 0;
    }

    // store data and timestamp
    // if and only if data stored previously is older (lower timestamp)
    public void update(long timestamp, double[] data) {
        lock.writeAcquire();
            if (timestamp > time) {
                if (this.data == null)
                    this.data = new double[data.length];
                time = timestamp;
                for (int i = 0; i < data.length; ++i)
                    this.data[i] = data[i];
            }
        lock.writeRelease();
    }

    // pre: val != null
    // pre: val.length matches length of data written via update
    // if no data has been written previously, return 0
    // otherwise return current timestamp and fill current data to array passed
    // as val
    public long get(double val[]) {
        try{
            lock.readAcquire();
                if (time == 0) return 0;
                for (int i = 0; i < data.length; ++i)
                    val[i] = data[i];
                return time;
        } finally{lock.readRelease();}
    }
}

It supports an update, which depends on the time when new data was received, and get, which extracts the data stored in the specific sensor.

This is my implementation of the FairRWLock:

class FairRWLock{
    private int readers = 0, writers = 0, readersWaiting = 0, writersWaiting = 0, writersWait = 0;
    private static final int ReaderPriority = 30;
    private Lock lock = new ReentrantLock();
    private Condition readerPass = lock.newCondition();
    private Condition writerPass = lock.newCondition();
    /*
     * readers denotes the number of current readers, writers equivalent, readersWaiting denotes the number of readers
     * awaiting their signal, writersWaiting equivalent. writersWait denotes the number of readers the writers have to
     * let pass before they can proceed, this amount is controlled by the ReaderPriority (reset occurs when writer releases)
     */


    /*
     * increment the number of waiting readers, check if there are any currently working writers OR writers are waiting
     * whilst they don't have to let any readers pass. When signaled, decrement readersWaiting, decrement the number of
     * readers the writers have to let pass and increment the number of current readers.
     */
    public void readAcquire(){
        lock.lock();
            readersWaiting++;
            while(writers > 0 || (writersWaiting > 0 && writersWait <= 0)){
                try {
                    readerPass.await();
                } catch (InterruptedException e) {}
            }
            readersWaiting--;
            writersWait--;
            readers++;
        lock.unlock();
    }

    /*
     * simply decrement number of readers and signal the threads that have to be signaled
     */
    public void readRelease(){
        lock.lock();
            readers--;
            signaling();
        lock.unlock();
    }

    /*
     * increment number of waiting writers, check if there are currently working writers OR readers OR readers currently
     * have priority over the writers. When signaled decrement writersWaiting, increment number of writers
     */
    public void writeAcquire(){
        lock.lock();
            writersWaiting++;
            while(writers > 0 || readers > 0 || (readersWaiting > 0 && writersWait > 0)){
                try{
                    writerPass.await();
                } catch(InterruptedException e) {}
            }
            writersWaiting--;
            writers++;
        lock.unlock();
    }

    /*
     * simply decrement number of current writers, reset the number of readers the writers have to let pass before
     * another writer may pass. signal the ones that should be
     */
    public void writeRelease(){
        lock.lock();
            writers--;
            writersWait = ReaderPriority;
            signaling();
        lock.unlock();
    }

    /*
     * check first if readers currently got priority over the writers. if so (readersWaiting??) ? signal them : signalAll,
     * if not (writersWaiting??) ? signal them : signalAll
     */
    private void signaling(){
        if(writersWait > 0){
            if(readersWaiting > 0) readerPass.signalAll();
            else writerPass.signal();
        } else{
            if(writersWaiting > 0) writerPass.signal();
            else readerPass.signalAll();
        }
    }
}

I'm not very familiar with the locking by conditions and it seems my code suffers either from starvation or even deadlock. However I can't find the issue (which most probably is somewhere in the FairRWLock implementation).


Solution

  • There is no sense in trying to build a fair lock atop an unfair lock. Right when threads enter either readAcquire() or writeAcquire(), they are calling lock.lock() and if that doesn’t succeed immediately, they may be put into the wait state and get overtaken by an arbitrary number of threads before they can proceed.

    At this point, it is already impossible to reestablish fairness by whatever you do afterwards. But it’s worth noting that you are also missing the implications of await(). This operation will release the lock temporarily, as only that gives other threads a chance to fulfill the condition you are awaiting. When the thread gets signal()ed, it has to re-acquire the lock, which is again, not a fair operation. An arbitrary number of threads may make a new lock request, changing the situation entirely before the thread having called await() long ago will proceed.

    In the end, you do not want fairness. The update operation is intended to ignore outdated updates, so it would actually be a win if newer update request can proceed faster, as the pending older ones will become no-ops then. For concurrent get requests, you actually don’t want blocking at all, all read requests should be able to run concurrently, but, of course, you want consistency (thread safety) and no writer starvation here.

    The best solution is to do no locking at all and implement the entire operation lock-free:

    class LockedSensors implements Sensors {
        private static final class State {
            final long time;
            final double[] data;
            State(long t, double[] in) {
                time = t;
                data = in.clone();
            }
        }
        final AtomicReference<State> current = new AtomicReference<>();
        LockedSensors() {}
    
        // store data and timestamp
        // if and only if data stored previously is older (lower timestamp)
        public void update(long timestamp, double[] data) {
            State newState = null;
            for(;;) {
                State old = current.get();
                if(old != null && old.time > timestamp) return;
                if(newState == null) newState = new State(timestamp, data);
                if(current.compareAndSet(old, newState)) return;
            }
        }
    
        // pre: val != null
        // pre: val.length matches length of data written via update
        // if no data has been written previously, return 0
        // otherwise return current timestamp and fill current data to array passed as val
        public long get(double[] val) {
            State actual = current.get();
            if(actual == null) return 0;
            if(actual.data.length != val.length)
                throw new IllegalArgumentException();
            System.arraycopy(actual.data, 0, val, 0, actual.data.length);
            return actual.time;
        }
    }
    

    Here, readers can always proceed returning the result of the last completed update while never blocking any writers. Even writers do not block each other, but may have to spin if another update happened in-between, but since every writer will return as soon as a newer timestamp has been encountered and there will always be at least one writer making progress, there is no problem here.