Search code examples
springrabbitmqspring-rabbit

When processing message long time don't work auto acknowledgement in @RabbitListner


I am using RabbitMQ to run several Spring Batch jobs. Execution takes a long time, up to 10 minutes per job. After all the work is performed, acknowledgement in @RabbitListner does not work out in the listener and the work starts again. If you reduce the working time, then everything works well. How can this be fixed? Important: Jobs is completed correctly and without exceptions!

Configuration

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.queue-name}")
    private String queueName;

    @Value("${spring.rabbitmq.exchange-name}")
    private String exchangeName;

    @Value("${spring.rabbitmq.routing-key}")
    private String routingKey;


    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(routingKey);
    }


    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter(jsonMapper());
    }

    @Bean
    public JsonMapper jsonMapper() {
        JsonMapper jsonMapper = new JsonMapper();
        jsonMapper.registerModule(new JavaTimeModule());
        jsonMapper.setDateFormat(new StdDateFormat())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return jsonMapper;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPort(port);
        return connectionFactory;
    }

    @Bean
    public AmqpTemplate rabbitTemp() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }


}

@RabbitListner, for example if from = to.minusDays(6) working well

 @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD)
    public void processQueue(FieldDto newField) {
        PolygonDto polygonDto = mapper.map(newField);
        PolygonDto save = polygonService.save(polygonDto);

        LocalDateTime to = DateUtil.now();
        LocalDateTime from = to.minusYears(6);

        evalScriptTypeList.forEach(type -> {
            try {
                startJob(save, to, from, type);
            } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                logger.error(e.getMessage(), e.getCause());
            }
        });

    }

    private void startJob(PolygonDto save, LocalDateTime to, LocalDateTime from, EvalScriptType type) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {

      JobParameters jobParameters = new JobParametersBuilder()
                .addString("t", to.toString())
                .addString(JOB_PARAMETER_DATE_TO, to.toString())
                .addString(JOB_PARAMETER_DATE_FROM, from.toString())
                .addString(JOB_PARAMETER_NEW_POLYGON_ID, save.getId())
                .addString(JOB_PARAMETER_EVAL_SCRIPT_TYPE, type.toString())
                .addString(JOB_PARAMETER_VERSION_SCRIPT, type.getVersion())
                .toJobParameters();

        jobLauncher.run(uploadSatelliteImageJob, jobParameters);
    }

I do not know mb will work manually acknowledgement, but mb is there a way to fix the automatic acknowledgement?


Solution

  • I used manual commit

        @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD, ackMode = "MANUAL")
        public void processQueue(FieldDto newField, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
            try {
    
                channel.basicAck(tag, false);
    
                //your code
    
            } catch (Exception e) {
                channel.basicReject(tag, true);
                logger.error(e.getMessage(), e.getCause());
            }
        }