Search code examples
javaconcurrencyexecutorservicejava.util.concurrentbackpressure

Handle back-pressure in FixedThreadPool


How to deal with back-pressure in Java using thread pool?

How to reject new tasks so there are no more than N submitted tasks. N - is the maximum number of allowed tasks in submission queue, which include new, running, paused (not finished) tasks.

Use case

Users submit calculation tasks that run for some time. Sometimes, there are so many users submitting tasks at the same time. How to reject new tasks if there are already N tasks submitted.

In other words, the total number of submitted (not finished, started or not started) tasks cannot be greater than N.

Example code

Here is full version and bellow are short snippets.

A long running task. CalculationTask.

public class CalculationTask {
    public CalculationTask(final String name) {
        this.name = name;
    }

    public CalculationResult calculate() {
        final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS);
        sleep(waitTimeMs);
        final int result = Math.abs(RANDOM.nextInt());
        final String text = "This is result: " + result;
        final CalculationResult calculationResult = new CalculationResult(name, text, result);
        System.out.println("Calculation finished: " + calculationResult);
        return calculationResult;
    }
}

Its result. CalculationResult.

public class CalculationResult {

    private final String taskName;
    private final String text;
    private final Integer number;
    // Getters, setters, constructor, toString.
}

This is how I submit jobs. CalculationBroker.

public class CalculationBroker {

    private static final int MAX_WORKERS_NUMBER = 5;

    private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER);
    private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>();

    public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) {
        final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
        if (calculationResultCached != null) {
            return CompletableFuture.completedFuture(calculationResultCached);
        }

        System.out.println("Calculation submitted: " + calculationTask.getName());

        final CompletableFuture<CalculationResult> calculated = CompletableFuture
                .supplyAsync(calculationTask::calculate, executorService);
        calculated.thenAccept(this::updateCache);
        return calculated;
    }

    private void updateCache(final CalculationResult calculationResult) {
        calculationCache.put(calculationResult.getTaskName(), calculationResult);
    }
}

And this is how I run them together. Main.

public class Main {

    public static void main(String[] args) {
        final int N_TASKS = 100;
        final CalculationBroker calculationBroker = new CalculationBroker();
        final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>();
        for (int i = 0; i < N_TASKS; i++) {
            final CalculationTask calculationTask = createCalculationTask(i);
            final CompletableFuture<CalculationResult> calculationResultCompletableFuture =
                    calculationBroker.submit(calculationTask);
            completableFutures.add(calculationResultCompletableFuture);
        }

        calculationBroker.close();
    }

    private static CalculationTask createCalculationTask(final int counter) {
        return new CalculationTask("CalculationTask_" + counter);
    }
}

This is output.

2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97.
2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98.
2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99.
2020-05-23 14:14:54 [pool-1-thread-3] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066}
2020-05-23 14:14:55 [pool-1-thread-1] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885}
2020-05-23 14:14:56 [pool-1-thread-5] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120}
20

My findings.

Bellow details

Code above is equivalent to Executors.newFixedThreadPool(n), however instead of default unlimited LinkedBlockingQueue we use ArrayBlockingQueue with fixed capacity of 100. This means that if 100 tasks are already queued (and n being executed), new task will be rejected with RejectedExecutionException.

ThreadPoolExecutor uses a LinkedBlockingQueue, which is unlimited by default.

As the post above sugessts:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

Solution

  • You answered your own question ... you could use Queue size to do that..

    int poolSize = ...;
    int queueSize = ...;
    CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler();
    
    ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(queueSize),
        handler);
    

    You could use CustomRejectedExecutionHandler to handle rejected threads.

    import java.util.concurrent.ThreadPoolExecutor;
    
    import org.apache.log4j.Logger;
    
    public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    
        public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class);
    
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            LOGGER.error(runnable.toString() + " execution rejected.");
        }
    }