Search code examples
springspring-amqpspring-rabbit

spring-amqp zero consumer utilization with non thread-safe listener


We are experiencing a problem in production where consumers are having zero utilization and the queues keep growing and performance degrades.

Each of the consumers is a container which contains a single instance of a non thread-safe listener bean.

Each listener needs to write to its own set of files. In order to avoid thread contention I would like only one thread to write to its own set of files.

Each listener is only instantiated once by using @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

I'm using a configuration similar to the one in this question

Each container is also configured with a retry advice which has the following code:

public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
    private static final int DEFAULT_RETRY_COUNT = 5;
    private static final int DEFAULT_BACKOFF_MS = 250;
    private int retryCount;
    private int backOffPeriodInMS;

    public RetryMessageAdvice() {
        this.retryCount = DEFAULT_RETRY_COUNT;
        this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
        initializeRetryPolicy();
    }

    public RetryMessageAdvice(int retryCount, int backoff) {
        this.retryCount = retryCount;
        this.backOffPeriodInMS = backoff;
        initializeRetryPolicy();
    }

    public void initializeRetryPolicy() {

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(this.retryCount);

        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(backOffPeriodInMS);

        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        this.setRetryOperations(retryTemplate);
        this.setMessageRecoverer(new RetryMessageRecoverer());
    }

    public int getRetryCount() {
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }
}

The consumer looks something like this:

@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
    private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);

    private final ExportMapper exportMapper;
    private final ExportFormatter exportFormatter;

    @Autowired
    public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
            @Qualifier("exportMapper") ExportedMapper exportedMapper,
            @Value("${export.root.dir}") String exportDirectory) {
        super(exportDirectory);
        this.exportedFormatter = exportFormatter;
        this.exportedMapper = exportedMapper;
    }

    @Override
    public void handle(AnalyticsEvent analyticsEvent) throws Exception {

        ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);

        File csvFile = getCsvFile(exportedEvent);
        String csvRow = exportFormatter.writeAsString(exportedEvent);
        writeCsvRow(csvRow, csvFile);
    }
}

Other things to note

  1. Export mapper and export formatter are thread-safe but not using @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  2. The method writeCsvRow is synchronized.
  3. There is a high number of errors which cause the exportMapper to throw an exception and trigger the retry advice
  4. The incoming message rage is 120/s
  5. The ratio between the incoming and deliver rate is usually 5:1

My theories on what is wrong are

  1. The high number of errors is causing a large number of retries and degrading performance. I would be better off putting the bad message in an error queue.
  2. Somehow the synchronized method in writeCsvRow is causing problems with some higher level thread managed by spring-amqp.

My question is, which theory is right? Is the impact of the retry advice the problem?


Solution

    1. If those beans are also not thread-safe, they must also be prototype scope.
    2. Since there's only one thread, synchronizing that method is not necessary but it shouldn't hurt.
    3. If the errors are irrecoverable, you should configure the retry policy to not retry those exceptions.

    .

    1. With those retry settings, you will suspend the container thread for 250ms each time you get an error. So, yes; it will hurt performance.
    2. Shouldn't be a significant overhead.