Search code examples
javaexecutorserviceexecutor

I can not get RejectedExecutionException with ExecutorService


I create scheduller for test handling of RejectedExecutionException:

@Component
public class TestScheduler {

    private final TestService testService;
    private ExecutorService executorService;

    public TestScheduler(TestService testService) {
        this.testService = testService;
    }

    @PostConstruct
    public void init() {
        executorService = Executors.newFixedThreadPool(5);
    }

    @Scheduled(fixedRate = 10L)
    public void test() {
        System.out.println("test");
        executorService.execute(testService::print);
    }
}

And service with delay 70 seconds:

@Component
public class TestService {

    public void print() {
        System.out.println("print start");
        try {
            TimeUnit.SECONDS.sleep(70);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("print end");
    }
}

I wait next logic:

  1. Scheduler call executorService.execute(testService::print) 5 times
  2. Each testService::print will execute 70 seconds
  3. When execute method will call sixth time I get RejectedExecutionException

But I not get Exception. I have this logs:

test
print start
2018-10-22 11:26:45.543  INFO 5960 --- [           main] c.e.s.SchedullerExceptionsApplication    : Started SchedullerExceptionsApplication in 0.661 seconds (JVM running for 1.108)
test
print start
test
print start
test
print start
test
print start
test
...
70 seconds print test

EDIT

In real project I have this code:

@PostConstruct
    public void init() {
        executorService = Executors.newFixedThreadPool(100, new CustomizableThreadFactory("SendRequestExecutor-"));
    }

@Scheduled(fixedDelay = 1000L)
public void sendReady() {
    try {
        List<Message> messages = messageService.findReadyToSend();
        for (Message message : messages) {
            message.setStatus(IN_PROCESS);
            Message savedMessage = messageService.save(message);
            executorService.execute(() -> sendRequestService.send(savedMessage.getGuid()));
        }
    } catch (Exception e) {
        log.error("handle: " + e.getMessage());
    }
}

does it mean that this code is wrong? because that can happen so I will change entity to status IN_PROCESS and when try execute - if executorService full I do not get exception and executorService not execute my task?


Solution

  • There are two aspects involved when defining an executor.

    1. The number of threads that will be used by the executor. This puts a limit on the number of concurrent tasks the executor can run. This is what you are setting via Executors.newFixedThreadPool(5).
    2. The size of the underlying task submission queue. This puts a limit on the number of tasks the underlying queue can store until it will throw an exception. The executor created by newFixedThreadPool uses an unbounded queue, therefore you do not get an exception.

    You can achieve your desired behavior by creating your own executor service as follows and submitting 11 tasks to it (5 to use all the threads, 5 to fill the underlying task queue and 1 to overflow it).

    new ThreadPoolExecutor(5, 
                           5, 
                           2000L, 
                           TimeUnit.MILLISECONDS, 
                           new ArrayBlockingQueue<Runnable>(5, true), 
                           new ThreadPoolExecutor.CallerRunsPolicy());