Search code examples
javamultithreadingconcurrencylock-freeoptimistic-concurrency

Detecting concurrent calls of a method


I have a class Serializer:

class Serializer<T> extends Consumer<T> {
    final Consumer<? super T> actual;
    // constructor omitted for brewity
    @Override public synchronized void accept(T t) {
         actual.accept(t);
    }
}

The purpose is to make sure the actual is run from a single thread at a time. However, calling a callback while holding a lock is generally dangerous so instead of holding a lock, callers queue up the incoming value and one of the threads will go in, drain the queue and call the actual consumer in a loop, sequentially. (The other limitation is that the number of concurrent callers is not known.)

    final ConcurrentLinkedQueue<T> queue;
    final AtomicInteger wip;
    @Override public void accept(T t) {
        queue.offer(t);
        if (wip.getAndIncrement() == 0) {
            do {
                actual.accept(queue.poll());
            } while (wip.decrementAndGet() > 0);
        }
    }

This works, and stepping aside the problem with the unbounded queue, thread hopping and a thread stuck in the loop, but benchmarking gives 10% throughput in single threaded case compared to a direct method call. When I. Implement this queueing/emitting with a synchronized block, benchmark gives 50% of the direct case because the JVM optimizes away the synhronization; which would be great but it doesn't scale as well. Using juc.Lock scales but suffers the a similar single-threaded throughput degradation as the above code. If I'm not mistaken, once the JVM optimizes synchronized away, it still has to use some guard in case the method gets invoked concurrently again and put back the locking.

My question is, how can I achieve similar effect with a lock, queue or other serialization logic, i.e., have a cheap and fast path for the case when there is no concurrent call going on and ave another path for the concurret case, so the code scales and remains fast for single threaded use as well.


Solution

  • As said by others, synchronized already gives you a fairly efficient tool. So using something different should not be motivated by hoping for better performance.

    However, if your intention is not to block callers as you said in your question, it is legitimate. (Though this might imply living with lesser performance).

    The first thing I see when looking at your attempt using the queue and atomic integer is that it is possible to bypass the queue in the case that there are no pending items and no other consumers running. In the case of low contention that might reduce the overhead:

    final ConcurrentLinkedQueue<T> queue;
    final AtomicInteger wip;
    @Override public void accept(T t) {
      if(wip.compareAndSet(0, 1)) { // no contention?
        actual.accept(t);
        if(wip.decrementAndGet()==0) return; // still no contention
      }
      else {
        if(!queue.offer(t))
          throw new AssertionError("queue should be unbounded");
        if(wip.getAndIncrement() != 0) return; // other consumer running
      }
      do {
        actual.accept(queue.poll());
      } while (wip.decrementAndGet() > 0);
    }