The application I am working on receives notifications from external systems, which I want to process.
Till now I have the following implementation:
public class AsynchronousServiceImpl implements AsynchronousService {
private TaskExecutor taskExecutor;
@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}
@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
spring configuration (I only need 1 thread since I don't want to execute the notifications in parallel due some legacy issues which are hard to change)
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
Here I execute then the code:
asynchronousService.executeAsynchronously(new Runnable() {
@Override
public void run() {
someMethod.processNotification(notification)
}
});
This Notification object which I have contains a timestamp field. I want to prioritize the notifications in the queue by this field (I think Spring uses as default an unbounded queue which is fine more me, since I need an unbounded queue)
Can I integrate this somehow in my spring application without implementing it manually from scratch? So I want to sort the taskss (runnable-objects) in the queue based on the timestamp field on the notification object.(It is that object that I am passing to the "processNotification" method)
ThreadPoolTaskExecutor
is backed by BlockingQueue
:
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<Runnable>(queueCapacity);
}
else {
return new SynchronousQueue<Runnable>();
}
}
If you want to order your task, you need to override this function to allow priority ordering:
public class YourPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}
Your submitted task must be comparable:
public class YourTask implements Runnable, Comparable<YourTask> {
private Notification notification;
public YourTask(Notification notification) {
this.notification = notification;
}
@Override
public void run() {
someMethod.processNotification(notification)
}
@Override
public int compareTo(B other) {
// Here you implement the priority
return notification.getTimestamp().compareTo(other.notification.getTimestamp());
}
}
Then submit your task:
asynchronousService.executeAsynchronously(new YourTask(notificationX));