Search code examples
spring-batchspring-integrationspring-jms

Spring Batch partitioned job JMS acknowledgement


Let's say I have a Spring Batch remote partitioned job, i.e. I have a manager application instance which starts the job and partitions the work and I have multiple workers who are executing individual partitions.

The message channel where the partitions are sent to the workers is an ActiveMQ queue and the Spring Integration configuration is based on JMS.

Assume that I wanna make sure that in case of a worker crashing in the middle of the partition execution, I want to make sure that another worker will pick up the same partition.

I think here's where acknowledging JMS messages would come in handy to only acknowledge a message in case a worker has fully completed its work on a particular partition but it seems as soon as the message is received by a worker, the message is acknowledged right away and in case of failures in the worker Spring Batch steps, the message won't reappear (obviously).

Is this even possible with Spring Batch? I've tried transacted sessions too but it doesn't really work either.

I know how to achieve this with JMS API. The difficulty comes from the fact that there is a lot of abstraction with Spring Batch in terms of messaging, and I'm unable to figure it out.


Solution

  • I know how to achieve this with JMS API. The difficulty comes from the fact that there is a lot of abstraction with Spring Batch in terms of messaging, and I'm unable to figure it out.

    In this case, I think the best way to answer this question is to remove all these abstractions coming from Spring Batch (as well as Spring Integration), and try to see where the acknowledgment can be configured.

    In a remote partitioning setup, workers are listeners on a queue in which messages coming from the manager are of type StepExecutionRequest. The most basic code form of a worker in this setup is something like the following (simplified version of StepExecutionRequestHandler, which is configured as a Spring Integration service activator when using the RemotePartitioningWorkerStepBuilder):

    @Component
    public class BatchWorkerStep {
    
        @Autowired
        private JobRepository jobRepository;
    
        @Autowired
        private StepLocator stepLocator;
    
        @JmsListener(destination = "requests")
        public void receiveMessage(final Message<StepExecutionRequest> message) throws JMSException {
            StepExecutionRequest request = message.getObject();
            
            Long jobExecutionId = request.getJobExecutionId();
            Long stepExecutionId = request.getStepExecutionId();
            String stepName = request.getStepName();
    
            StepExecution stepExecution = jobRepository.getStepExecution(jobExecutionId, stepExecutionId);
            Step step = stepLocator.getStep(stepName);
            try {
                step.execute(stepExecution);
                stepExecution.setStatus(BatchStatus.COMPLETED);
            } catch (Throwable e) {
                stepExecution.addFailureException(e);
                stepExecution.setStatus(BatchStatus.FAILED);
            } finally {
                jobRepository.update(stepExecution); // this is needed in a setup where the manager polls the job repository
            }
        }
    }
    

    As you can see, the JMS message acknowledgment cannot be configured on the worker side (there is no way to do it with attributes of JmsListener, so it has to be done somewhere else. And this is actually at the message listener container level with DefaultJmsListenerContainerFactory#setSessionAcknowledgeMode.

    Now if you are using Spring Integration to configure the messaging middleware, you can configure the acknowledgment mode in Spring Integration .