I am doing a Spring Batch application that have manager and worker using remote partitioning with ActiveMQ Arterims. I want my worker only consume the message from queue when its finished the remote step. However, once there are messages on queue they will be consumed by the listener immediately. How can I do modify the listener to only one task executor or one consumer per worker? I was referring the Spring office document as below configuration.
I was focusing on where the worker listener being configurated. Even when I changed the concurrency to 1-1 it's not working. The worker still will consume all the message immediately.
Here is the manager configuration:
@Bean
public DirectChannel arequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow aoutboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows.from(arequests())
.handle(Jms.outboundAdapter(connectionFactory).destination("testingReq"))
.get();
}
@Bean
public DirectChannel breplies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow binboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("testingRes"))
.channel(breplies())
.get();
}
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.partitioner("workerStep", new BasicPartitioner())
.gridSize(GRID_SIZE)
.outputChannel(arequests())
.inputChannel(breplies())
.build();
}
@Bean
public Job remotePartitioningJob() {
return jobBuilderFactory.get("remotePartitioningJob")
.incrementer(new RunIdIncrementer())
.start(managerStep())
.build();
}
// And worker configuration:
@Bean
public DirectChannel crequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow cinboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("testingReq")
.configureListenerContainer(c -> c.concurrency("1-1")))
.log(LoggingHandler.Level.INFO, "Received Message", m -> "Received message: " + m.getPayload())
.channel(crequests())
.get();
}
@Bean
public DirectChannel dreplies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow doutboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows.from(dreplies())
.handle(Jms.outboundAdapter(connectionFactory).destination("testingRes"))
.get();
}
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory.get("workerStep")
.inputChannel(crequests())
.outputChannel(dreplies())
.tasklet(tasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
return (contribution, chunkContext) -> {
log.info("Started");
Thread.sleep(20000);
log.info("finished " + partition);
return RepeatStatus.FINISHED;
};
}
The acknowledgement mode should be AUTO. Also, I was trying to request the manager to launch a remote job with 10 partition and I can see there are 10 MESSAGE_COUNT
via the artemis queue stat
. After I starting the worker one the number of DELIVERING_COUNT
added 10 and the worker started to process the first partition, but after I started another worker two I was expecting it to process the second partition or the rest which did not happen.
One consumer will fetch enough messages to fill its consumerWindowSize
for flow control. I recommend setting consumerWindowSize=0
on your JMS connection URL, e.g.:
tcp://host:61616?consumerWindowSize=0