Search code examples
javaspringqueueexecutor

Spring TaskExecutor unbounded queue


I implemented the Spring-TaskExecutor (which is the equivalent of the JDK 1.5's Executor.) to process notifications notifications receiving from external systems.

Interface with only one method:

 public interface AsynchronousService {
    void executeAsynchronously(Runnable task);
}

and the corresponding implementation:

public class AsynchronousServiceImpl implements AsynchronousService {

    private TaskExecutor taskExecutor;

    @Override
    public void executeAsynchronously(Runnable task) {
        taskExecutor.execute(task);
    }

    @Required
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

Xml-configuration of the task executor (legacy application):

<bean id="taskExecutor" class="org.example.impl.NotificationPool">
        <property name="corePoolSize" value="1"/>
        <property name="maxPoolSize" value="1"/>
        <!--<property name="queueCapacity" value="100"/>-->
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

1 is set for both, corePoolSize and maxPoolSize, since I want the tasks to execute sequentially (only 1 thread is created by the pool which process the tasks).

I want to order my task based on the date when i received the notification, theregore I need to override this function to allow priority ordering:

public class NotificationPool extends ThreadPoolTaskExecutor {
     @Override
     protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
          return new PriorityBlockingQueue<>(queueCapacity);
        }
    }

Here is the Notification task class:

public class NotificationTask implements Runnable, Comparable<NotificationTask> {

    private final NotificationService notificationService;
    private final Notification notification;

    public NotificationService(NotificationService notificationService, 
                               Notification notification) {
        this.notificationService = notificationService;
        this.notification = notification;
    }

    @Override
    public int compareTo(NotificationTask task) {
        return notification.getTimestamp().compareTo(task.getTimestamp());
    }

    @Override
    public void run() {
        notificationService.processNotification(notification);
    }
}

And this is how I execute it:

asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));

This works fine with a bounded queue, howere I need an unbounded queue. As you can see on the xml-configuration, the line to define the queue capacity is commented out:

<!--<property name="queueCapacity" value="100"/>-->

However, if I do this then I get an OutOfMemoryException. It seems that it tries to create an unbounded queue right at the start of the application. However, the Executor-Service allows us to use unbounded queues - but I don't know how to do that here.


Solution

  • According to the java doc:

    An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError)

    So basically, it's unbounded, and you don't need to care about the queue size.

    However, the queue is backed by an array:

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        // Here is the initialization of backed array
        this.queue = new Object[initialCapacity];
    }
    

    So you need to provide a reasonable initial queue size. After that, you're done.

    However, as stated, as more and more element comes, the queue might throw OutOfMemory exception when it tries to grow internal array if the internal array full.

    private void tryGrow(Object[] array, int oldCap)
    

    But it's very unlikely, unless your project produces millions of notifications in a short time