Search code examples
javarate-limitingthread-synchronizationconcurrentmodification

Rate Limitter created on Bucket Token Algorithm gives incorrect total requests consumes value when tested with Multiple Threads


I have created a bucket token based rate limitter in Java and tested it with a single thread and a multi thread approach using Executor Service. When I run with a single thread I get 200 as total requests consumed when I set 20 requests per second as the rate. Same config when I try to run with an multi threaded approach using executors I get 220 as the total requests consumed even though I have used synchronized keyword. I run both the approaches for a total of 10s.

Please find the two classes below:

BucketToken.class

package algo.interviewquestions;

public class BucketToken {
    private Integer maxBucketSize;
    private Integer tokensAvailable;
    private Long refillBucketRate;
    private Long nextRefillTime;
    private Long totalRequestsReceived;
    private Long totalRequestsConsumed;
    private Integer bucketRefillCount;

    public BucketToken(Integer maxBucketSize, Long refillBucketRate) {
        this.maxBucketSize = maxBucketSize;
        this.refillBucketRate = refillBucketRate;
        this.nextRefillTime = System.currentTimeMillis() + refillBucketRate;
        this.tokensAvailable = maxBucketSize;
        this.totalRequestsReceived = 0L;
        this.totalRequestsConsumed = 0L;
        this.bucketRefillCount = 0;
        refill();
    }

    public Long getTotalRequestsConsumed() {
        return totalRequestsConsumed;
    }

    public Integer getTokensAvailable() {
        return tokensAvailable;
    }

    public Integer getBucketRefillCount() {
        return bucketRefillCount;
    }

    public Long getTotalRequestsReceived() {
        return totalRequestsReceived;
    }

     public synchronized boolean tryConsume() {
        totalRequestsReceived++;
        refill();
        if (this.tokensAvailable > 0) {
            this.tokensAvailable --;
            this.totalRequestsConsumed++;
            return true;
        }
        return false;
    }

    private synchronized void refill() {
        if (System.currentTimeMillis() < this.nextRefillTime) {
            return;
        }
        this.nextRefillTime = System.currentTimeMillis() + this.refillBucketRate;
        this.tokensAvailable = Math.max(this.maxBucketSize, this.tokensAvailable);
        this.bucketRefillCount++;
    }

}

BucketTokenTest.class

package algo.interviewquestions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class BucketTokenTest {
    public static void main(String[] args) throws InterruptedException {
        BucketToken bucketToken = new BucketToken(20, 1000L);
        AtomicInteger totalRequestsConsumed1 = runInMultiThreadedApproach(bucketToken);

        BucketToken bucketToken2 = new BucketToken(20, 1000L);
        AtomicInteger totalRequestsConsumed2 = runWithSingleThread(bucketToken2);

        printRequestsStats(bucketToken, totalRequestsConsumed1);
        printRequestsStats(bucketToken2, totalRequestsConsumed2);
    }

    private static AtomicInteger runWithSingleThread(BucketToken bucketToken) {
        AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < 10000L) {
            if (bucketToken.tryConsume()) {
                totalRequestsConsumed.incrementAndGet();
                System.out.println(Thread.currentThread().getName() + " - Request Accepted");
                continue;
            }
            System.out.println(Thread.currentThread().getName() + " - Request Declined");
        }
        return totalRequestsConsumed;
    }

    private static void printRequestsStats(BucketToken bucketToken, AtomicInteger totalRequestsConsumed) {
        System.out.println("Total Requests Received = " + bucketToken.getTotalRequestsReceived());
        System.out.println("Total Requests Accepted = " + totalRequestsConsumed.get());
        System.out.println("Total Requests Accepted = " + bucketToken.getTotalRequestsConsumed());
        System.out.println("Bucket refill count = " + bucketToken.getBucketRefillCount());
    }

    private static AtomicInteger runInMultiThreadedApproach(BucketToken bucketToken) throws InterruptedException {
        int threadCount = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
        long startTime = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(threadCount);
        IntStream.rangeClosed(1,threadCount).boxed().forEach(t -> executorService.submit(() -> {
            while (System.currentTimeMillis() - startTime < 10000L) {
                if (bucketToken.tryConsume()) {
                    totalRequestsConsumed.incrementAndGet();
                    System.out.println(Thread.currentThread().getName() + " - Request Accepted");
                    continue;
                }
                System.out.println(Thread.currentThread().getName() + " - Request Declined");
            }
            latch.countDown();
        }));
        long startTime2 = System.currentTimeMillis();
        latch.await();
        System.out.println("Completed in " + (System.currentTimeMillis() - startTime2) + "ms");
        executorService.shutdown();
        return totalRequestsConsumed;
    }
}

Can someone tell me what change I need to do for handling the concurrent modification or is there something I am missing?

I expect both the approaches single thread and multi-thread to give the same request consumed count in 10s which is 200.

I tried using synchronized keyword on the methods where modification is done but still I am seeing different values.


Solution

  • I finally found the issue with the problem I posted. The increment operator I was using in the bucket token was a post unary operator (++ and --) which was causing additional requests to be consumed because the token decrement was happening at the next iteration. This shows that we need to be very careful while using these post unary operators or avoid them if we want to have a consistent logic without handling the additional increment. The updated synchronized block is as follows.

    public boolean tryConsume() {
            synchronized (objectLock) {
                totalRequestsReceived += 1;
                refill();
                if (this.tokensAvailable > 0) {
                    this.tokensAvailable -= 1;
                    this.totalRequestsConsumed += 1;
                    return true;
                }
                return false;
            }
        }