Search code examples
javaconcurrencyjava.util.concurrent

Replace synchronized with atomic+while loop in case of low lock contention


I have two functions which must run in a critical section:

public synchronized void f1() { ... }
public synchronized void f2() { ... }

Assume that the behavior is as following:

  • f1 is almost never called. Actually, under normal conditions, this method is never called. If f1 is called anyway, it should return quickly.
  • f2 is called at a very high rate. It returns very quickly.
  • These methods never call each other and there is no reentrancy as well.

In other words, there is very low contention. So when f2 is called, we have some overhead to acquire the lock, which is granted immediately in 99,9% of the cases. I am wondering if there are approaches to avoid this overhead.

I came up with the following alternative:

private final AtomicInteger lock = new AtomicInteger(0);

public void f1() {
    while (!lock.compareAndSet(0, 1)) {}

    try {
        ...
    } finally {
        lock.set(0);
    }
}

public void f2() {
    while (!lock.compareAndSet(0, 2)) {}

    try {
        ...
    } finally {
        lock.set(0);
    }
}

Are there other approaches? Does the java.util.concurrent package offer something natively?

update

Although my intention is to have a generic question, some information regarding my situation:

f1: This method creates a new remote stream, if for some reason the current one becomes corrupt, for example due to a timeout. A remote stream could be considered as a socket connection which consumes a remote queue starting from a given location:

private Stream stream;

public synchronized void f1() {
     final Stream stream = new Stream(...);

     if (this.stream != null) {
         stream.setPosition(this.stream.getPosition());
     }
     this.stream = stream;
     return stream;
}

f2: This method advances the stream position. It is a plain setter:

public synchronized void f2(Long p) {
    stream.setPosition(p);
}

Here, stream.setPosition(Long) is implemented as a plain setter as well:

public class Stream {
    private volatile Long position = 0;

    public void setPosition(Long position) {
        this.position = position;
    }
}

In Stream, the current position will be sent to the server periodically asynchronously. Note that Stream is not implemented by myself.

My idea was to introduce compare-and-swap as illustrated above, and mark stream as volatile.


Solution

  • From the description and your example code, I've inferred the following:

    1. Stream has its own internal position, and you're also tracking the most recent position externally. You use this as a sort of 'resume point': when you need to reinitialize the stream, you advance it to this point.

    2. The last known position may be stale; I'm assuming this based on your assertion that the stream periodically does asynchronously notifies the server of its current position.

    3. At the time f1 is called, the stream is known to be in a bad state.

    4. The functions f1 and f2 access the same data, and may run concurrently. However, neither f1 nor f2 will ever run concurrently against itself. In other words, you almost have a single-threaded program, except for the rare cases when both f1 and f2 are executing.

      [Side note: My solution doesn't actually care if f1 gets called concurrently with itself; it only cares that f2 is not called concurrently with itself]

    If any of this is wrong, then the solution below is wrong. Heck, it might be wrong anyway, either because of some detail left out, or because I made a mistake. Writing low-lock code is hard, which is exactly why you should avoid it unless you've observed an actual performance issue.

    static class Stream {
        private long position = 0L;
    
        void setPosition(long position) {
            this.position = position;
        }
    }
    
    final static class StreamInfo {
        final Stream stream = new Stream();
        volatile long resumePosition = -1;
    
        final void setPosition(final long position) {
            stream.setPosition(position);
            resumePosition = position;
        }
    }
    
    private final Object updateLock = new Object();
    private final AtomicReference<StreamInfo> currentInfo = new AtomicReference<>(new StreamInfo());
    
    void f1() {
        synchronized (updateLock) {
            final StreamInfo oldInfo = currentInfo.getAndSet(null);
            final StreamInfo newInfo = new StreamInfo();
    
            if (oldInfo != null && oldInfo.resumePosition > 0L) {
                newInfo.setPosition(oldInfo.resumePosition);
            }
    
            // Only `f2` can modify `currentInfo`, so update it last.
            currentInfo.set(newInfo);
    
            // The `f2` thread might be waiting for us, so wake them up.
            updateLock.notifyAll();
        }
    }
    
    void f2(final long newPosition) {
        while (true) {
            final StreamInfo s = acquireStream();
    
            s.setPosition(newPosition);
            s.resumePosition = newPosition;
    
            // Make sure the stream wasn't replaced while we worked.
            // If it was, run again with the new stream.
            if (acquireStream() == s) {
                break;
            }
        }
    }
    
    private StreamInfo acquireStream() {
        // Optimistic concurrency: hope we get a stream that's ready to go.
        // If we fail, branch off into a slower code path that waits for it.
        final StreamInfo s = currentInfo.get();
        return s != null ? s : acquireStreamSlow();
    }
    
    private StreamInfo acquireStreamSlow() {
        synchronized (updateLock) {
            while (true) {
                final StreamInfo s = currentInfo.get();
    
                if (s != null) {
                    return s;
                }
    
                try {
                    updateLock.wait();
                }
                catch (final InterruptedException ignored) {
                }
            }
        }
    }
    

    If the stream has faulted and is being replaced by f1, it is possible that an earlier call to f2 is still performing some operations on the (now defunct) stream. I'm assuming this is okay, and that it won't introduce undesirable side effects (beyond those already present in your lock-based version). I make this assumption because we've already established in the list above that your resume point may be stale, and we also established that f1 is only called once the stream is known to be in a bad state.

    Based on my JMH benchmarks, this approach is around 3x faster than the CAS or synchronized versions (which are pretty close themselves).