Search code examples
concurrencyjava-8lockingjava.util.concurrent

High Throughput writes on a variable in Java 8 Concurrency?


If I have a simple Integer in a Java 8 program that is readable and writeable by multiple threads.

If I’m told the app needs to support high throughput reads and very few writes - the answer to this is pretty simple, I just use a read write lock. Then multiple threads can perform reads concurrently with no blocking - and blocking only occurs when an infrequent write is done.

But in the case if I’m told the app needs to support high throughput writes (ie the shared variable is being updated frequently by different threads). No matter what sort of a lock I use here, as far as I can see it will always result in blocking of threads - in that when a thread gets a lock on the variable and updates it, the remaining threads that are also trying to update the variable will just have to wait till they get the lock - is this correct or am I missing something in Java 8?

I could go off and write some sort of an asynchronous update method on the shared variable, where the thread calls the update method it returns immediately and I use some sort of a data structure under the covers to queue the writes to the shared variable. At least this way I would prevent the threads from blocking when trying to update the shared variable. Granted this approach would raise other issues like should the thread assume its guaranteed the write def. succeeded or should I provide a call back to inform the update succeeded etc. Other than something like this, I see no way round the blocking when using any Lock in Java 8 for high throughput writes? (Or should I just accept the blocking and just use the Lock anyway even in the case of high throughput writes). Thanks


Solution

  • Strictly speaking of Integer - you could use LongAdder, it's implementation is exactly for your case it seems. If you care here are some additional details.

    It uses CAS (compare and swap) under the hood, much like AtomicLong, but with a few differences. First of all the actual long value that it holds is wrapped in a so called Cell - basically a class the allows to cas (compare and swap) the value to a new value, much like a setter if you want. This Cell is also annotated with @sun.misc.Contended to prevent false-sharing; here is the explanation for it (from the code comments):

    But Atomic objects residing in arrays will tend to be placed adjacent to each other, and so will most often share cache lines (with a huge negative performance impact) without this precaution.

    The implementation is very interesting from here. Let's look at what happens when you call add(long x) method:

     public void add(long x) {
        Cell[] cs; long b, v; int m; Cell c;
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[getProbe() & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    

    The idea is that, if Cell [] cs is null, there was no contention before, meaning long value is either not initialized or all previous CAS operations have succeeded by all threads. In this case, try to CAS the new value to long value - if that worked, we are done. If that failed though, there is an array of Cell [] created, so that each separate thread tries to work in it's own space, minimizing contention.

    The next sentence is what you really care about if I understood your question correctly (and it's mine, does not come from code comments at all):

    In simpler words: if there is no contention between threads, the work is done as if AtomicLong is used (sort of), otherwise try to create a separate space for each thread to work on.

    If you care some additional details I found interesting:

    The Cell[] is always a power of two (much like HashMap internal array); then each thread uses ThreadLocalRandom to create some hashCode to try to find an entry in the array Cell [] cs to write to, or even re-hashing again using Marsaglia XorShif to try to find a free slot in this array; size of the array is capped to the numbers of cores you have (nearest power of two from that actually), this array can be resized, so it can grow and all these operations are done using a volatile int cellsBusy spin lock. This code is superb, but as said, I don't get all of it.