Search code examples
javaasynchronousatomicconcurrenthashmap

incrementAndGet method of AtomicLong is blocking call?


I am working on Multithreaded code from which I am trying to measure how much time a particular method is taking as I am trying to benchmark most of our teammate codes as I am doing Load and Performance testing of our Client code and then our Service code.

So for this performance measurement, I am using-

System.nanoTime();

And I am having Multithreaded code from which I am spawning multiple threads and trying to measure how much time that code is taking.

Below is the sample example by which I am trying to measure the performance of any code- In the below code I am trying to measure-

beClient.getAttributes method

Below is the code-

public class BenchMarkTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(5);

        try {

            for (int i = 0; i < 3 * 5; i++) {
                executor.submit(new ThreadTask(i));
            }

            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {

        }
    }

}

Below is the class that implements Runnable interface

class ThreadTask implements Runnable {
    private int id;
    public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();


    public ThreadTask(int id) {
        this.id = id;
    }

    @Override
    public void run() {


        long start = System.nanoTime();

        attributes = beClient.getAttributes(columnsList);

        long end = System.nanoTime() - start;

        final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
        if (before != null) {
            before.incrementAndGet();
        }
    }
}

Whatever code I want to measure, I usually put the below line just above that method

long start = System.nanoTime();

And these two lines after the same method but with different ConcurrentHashMap

long end = System.nanoTime() - start;

final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
        if (before != null) {
            before.incrementAndGet();
        }

Today I had the meeting with one of my senior folks and he said incrementAndGet method of ConcurrentHashMap is a blocking call. So your thread will be waiting for some time there.

And he asked me to make that Asynchronous call.

Is there any possibility of making that Asynchronous call?

Because in all our client code and service code to measure the performance of each method, I am using the same above three lines that I usually put before and after each method to measure the performance of those method. And after the program finishes, I am printing it out the result from those maps.

So now I am thinking of making that Asynchronous call? Can anyone help me to do that?

Basically, I am trying to measure the performance of a particular method in an asynchronous manner so that each thread won't wait and gets blocked.

I think, I can do this using Futures. Can anyone provide an example related to that?

Thanks for the help.


Solution

  • The line:

    if (before != null) {
        before.incrementAndGet();
    }
    

    Will lock the current Thread until before.incrementAndGet() acquires the lock (if you must know, there is in fact no lock, there is a while(true) and a compare-and-swap method) and returns the long value (that you are not using).

    You can make it asynchronous by calling that specific method in a Thread of its own, thus not blocking the current Thread.

    To do this, I believe you already know how: use Thread.start(), an ExecutorService or a FutureTask (check "How to asynchronously call a method in Java" on how to do it in an elegant fashion).

    In case I'm not clear, here's a solution using FutureTask:

    public class BenchMarkTest {
    
        public static void main(String[] args) {
    
            ExecutorService executor = Executors.newFixedThreadPool(5);
    
            int threadNum = 2;
            ExecutorService taskExecutor = Executors.newFixedThreadPool(threadNum);
            List<FutureTask<Long>> taskList = new ArrayList<FutureTask<Long>>();
    
            try {
    
                for (int i = 0; i < 3 * 5; i++) {
                    executor.submit(new ThreadTask(i, taskExecutor, taskList));
                }
    
                executor.shutdown();
                executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e) {
    
            }
    
            for (FutureTask<Long> futureTask : taskList) {
                futureTask.get(); // doing a job similar to joining threads
            }
            taskExecutor.shutdown();
        }
    
    }
    

    ThreadTask class:

    class ThreadTask implements Runnable {
        private int id;
        public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
    
        private ExecutorService taskExecutor;
        private List<FutureTask<Long>> taskList;    
    
        public ThreadTask(int id, ExecutorService taskExecutor, List<FutureTask<Long>> taskList) {
            this.id = id;
            this.taskExecutor = taskExecutor;
            this.taskList = taskList;
        }
    
        @Override
        public void run() {
    
    
            long start = System.nanoTime();
    
            attributes = beClient.getAttributes(columnsList);
    
            long end = System.nanoTime() - start;
    
            final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
            if (before != null) {
                FutureTask<Long> futureTask = new FutureTask<Long>(new Callable<Long>() {
                    public Long call() {
                        return before.incrementAndGet();
                    }
                });
                taskList.add(futureTask);
                taskExecutor.execute(futureTask);
            }
        }
    }
    

    Update:

    I thought of a little possible improvement: Instead of telling the taskExecutor to execute the futureTask in the ThreadTask class, it may be better to postpone the tasks' executions to the end of the main method. I mean:

    Remove the line below of ThreadTask.run():

                taskExecutor.execute(futureTask);
    

    And, in the main() method, where you have:

            for (FutureTask<Long> futureTask : taskList) {
                futureTask.get(); // doing a job similar to joining threads
            }
            taskExecutor.shutdown();
    

    Add the the execution of the tasks, thus having:

            taskExecutor.invokeAll(taskList);
            for (FutureTask<Long> futureTask : taskList) {
                futureTask.get(); // doing a job similar to joining threads
            }
            taskExecutor.shutdown();
    

    (Also, you can remove ThreadTask's ExecutorService field, as it will no longer use it.)

    This way, there is very little overhead while you are executing the benchmark (the overhead is adding an object to the taskList and nothing else).

    Full updated code:

    public class BenchMarkTest {
    
        public static void main(String[] args) {
    
            ExecutorService executor = Executors.newFixedThreadPool(5);
    
            List<FutureTask<Long>> taskList = new ArrayList<FutureTask<Long>>();
    
            try {
    
                for (int i = 0; i < 3 * 5; i++) {
                    executor.submit(new ThreadTask(i, taskList));
                }
    
                executor.shutdown();
                executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e) {
    
            }
    
            int threadNum = 2;
            ExecutorService taskExecutor = Executors.newFixedThreadPool(threadNum);
            taskExecutor.invokeAll(taskList);
            for (FutureTask<Long> futureTask : taskList) {
                futureTask.get(); // doing a job similar to joining threads
            }
            taskExecutor.shutdown();
        }
    
    }
    

    -

    class ThreadTask implements Runnable {
        private int id;
        public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
    
        private List<FutureTask<Long>> taskList;    
    
        public ThreadTask(int id, List<FutureTask<Long>> taskList) {
            this.id = id;
            this.taskList = taskList;
        }
    
        @Override
        public void run() {
    
    
            long start = System.nanoTime();
    
            attributes = beClient.getAttributes(columnsList);
    
            long end = System.nanoTime() - start;
    
            final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
            if (before != null) {
                FutureTask<Long> futureTask = new FutureTask<Long>(new Callable<Long>() {
                    public Long call() {
                        return before.incrementAndGet();
                    }
                });
                taskList.add(futureTask);
            }
        }
    }