Search code examples
javaspringqueueexecutor

Spring TaskExecutor Implementation Queue Priorization


The application I am working on receives notifications from external systems, which I want to process.

Till now I have the following implementation:

    public class AsynchronousServiceImpl implements AsynchronousService {

    private TaskExecutor taskExecutor;

    @Override
    public void executeAsynchronously(Runnable task) {
        taskExecutor.execute(task);
    }

    @Required
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

spring configuration (I only need 1 thread since I don't want to execute the notifications in parallel due some legacy issues which are hard to change)

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="1"/>
    <property name="maxPoolSize" value="1"/>
    <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

Here I execute then the code:

 asynchronousService.executeAsynchronously(new Runnable() {
     @Override
     public void run() {
         someMethod.processNotification(notification)
      }
   });

This Notification object which I have contains a timestamp field. I want to prioritize the notifications in the queue by this field (I think Spring uses as default an unbounded queue which is fine more me, since I need an unbounded queue)

Can I integrate this somehow in my spring application without implementing it manually from scratch? So I want to sort the taskss (runnable-objects) in the queue based on the timestamp field on the notification object.(It is that object that I am passing to the "processNotification" method)


Solution

  • ThreadPoolTaskExecutor is backed by BlockingQueue:

    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new LinkedBlockingQueue<Runnable>(queueCapacity);
        }
        else {
            return new SynchronousQueue<Runnable>();
        }
    }
    

    If you want to order your task, you need to override this function to allow priority ordering:

    public class YourPool extends ThreadPoolTaskExecutor {
        @Override
        protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
            return new PriorityBlockingQueue<>(queueCapacity);
        }
    }
    

    Your submitted task must be comparable:

    public class YourTask implements Runnable, Comparable<YourTask> {
        private Notification notification;
    
        public YourTask(Notification notification) {
            this.notification = notification;
        }
        @Override
        public void run() {
            someMethod.processNotification(notification)
        }
    
        @Override
        public int compareTo(B other) {
            // Here you implement the priority
            return notification.getTimestamp().compareTo(other.notification.getTimestamp());
        }
    }
    

    Then submit your task:

    asynchronousService.executeAsynchronously(new YourTask(notificationX));