I have two functions which must run in a critical section:
public synchronized void f1() { ... }
public synchronized void f2() { ... }
Assume that the behavior is as following:
f1
is almost never called. Actually, under normal conditions, this method is never called. If f1
is called anyway, it should return quickly.f2
is called at a very high rate. It returns very quickly.In other words, there is very low contention. So when f2
is called, we have some overhead to acquire the lock, which is granted immediately in 99,9% of the cases. I am wondering if there are approaches to avoid this overhead.
I came up with the following alternative:
private final AtomicInteger lock = new AtomicInteger(0);
public void f1() {
while (!lock.compareAndSet(0, 1)) {}
try {
...
} finally {
lock.set(0);
}
}
public void f2() {
while (!lock.compareAndSet(0, 2)) {}
try {
...
} finally {
lock.set(0);
}
}
Are there other approaches? Does the java.util.concurrent
package offer something natively?
update
Although my intention is to have a generic question, some information regarding my situation:
f1
: This method creates a new remote stream, if for some reason the current one becomes corrupt, for example due to a timeout. A remote stream could be considered as a socket connection which consumes a remote queue starting from a given location:
private Stream stream;
public synchronized void f1() {
final Stream stream = new Stream(...);
if (this.stream != null) {
stream.setPosition(this.stream.getPosition());
}
this.stream = stream;
return stream;
}
f2
: This method advances the stream position. It is a plain setter:
public synchronized void f2(Long p) {
stream.setPosition(p);
}
Here, stream.setPosition(Long)
is implemented as a plain setter as well:
public class Stream {
private volatile Long position = 0;
public void setPosition(Long position) {
this.position = position;
}
}
In Stream
, the current position will be sent to the server periodically asynchronously. Note that Stream
is not implemented by myself.
My idea was to introduce compare-and-swap as illustrated above, and mark stream
as volatile
.
From the description and your example code, I've inferred the following:
Stream
has its own internal position, and you're also tracking the most recent position
externally. You use this as a sort of 'resume point': when you need to reinitialize the stream, you advance it to this point.
The last known position
may be stale; I'm assuming this based on your assertion that the stream periodically does asynchronously notifies the server of its current position.
At the time f1
is called, the stream is known to be in a bad state.
The functions f1
and f2
access the same data, and may run concurrently. However, neither f1
nor f2
will ever run concurrently against itself. In other words, you almost have a single-threaded program, except for the rare cases when both f1
and f2
are executing.
[Side note: My solution doesn't actually care if f1
gets called concurrently with itself; it only cares that f2
is not called concurrently with itself]
If any of this is wrong, then the solution below is wrong. Heck, it might be wrong anyway, either because of some detail left out, or because I made a mistake. Writing low-lock code is hard, which is exactly why you should avoid it unless you've observed an actual performance issue.
static class Stream {
private long position = 0L;
void setPosition(long position) {
this.position = position;
}
}
final static class StreamInfo {
final Stream stream = new Stream();
volatile long resumePosition = -1;
final void setPosition(final long position) {
stream.setPosition(position);
resumePosition = position;
}
}
private final Object updateLock = new Object();
private final AtomicReference<StreamInfo> currentInfo = new AtomicReference<>(new StreamInfo());
void f1() {
synchronized (updateLock) {
final StreamInfo oldInfo = currentInfo.getAndSet(null);
final StreamInfo newInfo = new StreamInfo();
if (oldInfo != null && oldInfo.resumePosition > 0L) {
newInfo.setPosition(oldInfo.resumePosition);
}
// Only `f2` can modify `currentInfo`, so update it last.
currentInfo.set(newInfo);
// The `f2` thread might be waiting for us, so wake them up.
updateLock.notifyAll();
}
}
void f2(final long newPosition) {
while (true) {
final StreamInfo s = acquireStream();
s.setPosition(newPosition);
s.resumePosition = newPosition;
// Make sure the stream wasn't replaced while we worked.
// If it was, run again with the new stream.
if (acquireStream() == s) {
break;
}
}
}
private StreamInfo acquireStream() {
// Optimistic concurrency: hope we get a stream that's ready to go.
// If we fail, branch off into a slower code path that waits for it.
final StreamInfo s = currentInfo.get();
return s != null ? s : acquireStreamSlow();
}
private StreamInfo acquireStreamSlow() {
synchronized (updateLock) {
while (true) {
final StreamInfo s = currentInfo.get();
if (s != null) {
return s;
}
try {
updateLock.wait();
}
catch (final InterruptedException ignored) {
}
}
}
}
If the stream has faulted and is being replaced by f1
, it is possible that an earlier call to f2
is still performing some operations on the (now defunct) stream. I'm assuming this is okay, and that it won't introduce undesirable side effects (beyond those already present in your lock-based version). I make this assumption because we've already established in the list above that your resume point may be stale, and we also established that f1
is only called once the stream is known to be in a bad state.
Based on my JMH benchmarks, this approach is around 3x faster than the CAS or synchronized versions (which are pretty close themselves).