Search code examples
javaspring-bootamazon-ecsaws-fargateforkjoinpool

SQS Consumption too slow even with parallalism


I am facing a challenging problem which has blown my head now by trying multiple things so I am here to ask some experts.

I have sqs consumer which gets 100k messages every 15 mins and I have a consumer in spring boot application which poll 10 messages and then fork it and parallel it.

I am running spring application with 10 ECS containers of having 4 core vCPU & 8GB memory.

Can someone please guide me why my consumption is not getting faster ?

@Component
@Profile("!test")
public class StockRangeConsumerScheduler {

    private final StockRangeConsumer stockRangeSQSConsumer;

    public StockRangeConsumerScheduler(StockRangeConsumer stockRangeSQSConsumer) {
        this.stockRangeSQSConsumer = stockRangeSQSConsumer;
    }

    @Scheduled(fixedDelayString = "${stockrange.scheduled.delay.fixed}", initialDelayString = "${stockrange.scheduled.delay.initial}")
    public void process() throws ExecutionException, InterruptedException {
        stockRangeSQSConsumer.consume();
    }

}

and this is the consumer

@Service
@Profile({"aws", "vanguard"})
public class StockRangeConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(StockRangeConsumer.class);

    public static final int THREADS = 4;

    private final ObjectMapper objectMapper;
    private final SqsMessageConsumer stockRangeConsumer;
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(THREADS);

    private final StockRangeDataProcessor stockRangeDataProcessor;
    private final PilotFacilityFilterService pilotFacilityFilterService;

    public StockRangeConsumer(StockRangeDataProcessor stockRangeDataProcessor,
                              @Qualifier(StockRangeConfiguration.STOCK_RANGE_CONSUMER) SqsMessageConsumer stockRangeConsumer,
                              ObjectMapper objectMapper,
                              PilotFacilityFilterService pilotFacilityFilterService) {
        this.pilotFacilityFilterService = pilotFacilityFilterService;
        this.objectMapper = objectMapper;
        this.stockRangeConsumer = stockRangeConsumer;
        this.stockRangeDataProcessor = stockRangeDataProcessor;
    }

    public void consume() throws ExecutionException, InterruptedException {
        List<Message> messages = stockRangeConsumer.retrieve();
        LOGGER.info("Number of available core in the processor is: {}", Runtime.getRuntime().availableProcessors());

        forkJoinPool.submit(() -> messages.parallelStream().forEach(message -> {
            try {
                RangePredictionData rangePredictionData = toRangePrediction(message.getMessage());
                LOGGER.info("Starting run analysis for part key: {}", rangePredictionData.getPartKey());
                LOGGER.debug("***** RangePredictionData with PartKey: {}, list size: {}", rangePredictionData.getPartKey(), messages.size());
                processRangePredictionData(rangePredictionData);
                LOGGER.info("Ended run analysis for part key: {}", rangePredictionData.getPartKey());
                stockRangeConsumer.delete(message);
            } catch (Exception e) {
                LOGGER.error("Error reading message from stock range queue : ", e);
            }
        })).get();

    }

    // TODO move logic to usecase/service and this class to adapter
    public void processRangePredictionData(RangePredictionData rangePredictionData) {
        if (rangePredictionData == null || rangePredictionData.getPartKey() == null || rangePredictionData.getRangeCalculationData() == null) {
            LOGGER.error("Range prediction consumed, but is null or missing range calculation data");
            return;
        }

        if (pilotFacilityFilterService.isPilotFacilityAndExcludedUsageCode(rangePredictionData.getPartKey().getFacilityCode(), rangePredictionData.getPartKey().getUsageCode())) {
            stockRangeDataProcessor.processRangePredictionData(rangePredictionData);
        }
    }

    private RangePredictionData toRangePrediction(String message) throws JsonProcessingException {
        return objectMapper.readValue(message, RangePredictionData.class);
    }

}

this is my configuration

@Configuration
@EnableAsync
public class AsyncConfiguration {

    @Bean
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(10);
        return executor;
    }
}

another configuration

@Configuration
@EnableScheduling
public class SchedulerConfiguration implements SchedulingConfigurer {
    

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

        threadPoolTaskScheduler.setPoolSize(8);
        threadPoolTaskScheduler.setThreadNamePrefix("scheduled-task-");
        threadPoolTaskScheduler.setRemoveOnCancelPolicy(true);
        threadPoolTaskScheduler.initialize();

        taskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
    }
}

this is a sqs configuration and waitTimeSeconds is 0

@Configuration
@Profile("aws")
public class StockRangeConfiguration {

    public static final String STOCK_RANGE_CONSUMER = "stockRangeSqsConsumer";

    private final String accessKey;
    private final String secretKey;
    private final String region;
    private final String queueUrl;
    private final int maxNumberOfMessages;
    private final String endpoint;

    public StockRangeConfiguration(@Value("${aws.sqs.stockRange.accessKey}") String accessKey,
                                   @Value("${aws.sqs.stockRange.secret_access_key}") String secretKey,
                                   @Value("${aws.sqs.stockRange.region}") String region,
                                   @Value("${aws.sqs.stockRange.queueUrl}") String queueUrl,
                                   @Value("${aws.sqs.stockRange.maxNumberOfMessages:10}") int maxNumberOfMessages,
                                   @Value("${aws.sqs.endpoint:}") String endpoint) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.region = region;
        this.queueUrl = queueUrl;
        this.maxNumberOfMessages = maxNumberOfMessages;
        this.endpoint = endpoint;
    }

    @Bean(name = STOCK_RANGE_CONSUMER)
    public SqsMessageConsumer stockRangeConsumer() {
        if (endpoint.isEmpty()) {
            return new SqsMessageConsumer(new SqsConfiguration()
                    .withCredentials(accessKey, secretKey)
                    .withRegion(region)
                    .withUrl(queueUrl)
                    .withMaxNumberOfMessages(maxNumberOfMessages));
        }

        return new SqsMessageConsumer(new SqsConfiguration()
                .withCredentials(accessKey, secretKey)
                .withEndpoint(endpoint, region)
                .withUrl(queueUrl)
                .withMaxNumberOfMessages(maxNumberOfMessages));
    }
}

Just the additional info, I have set Task scheduler value to 8 because there are other scheduler which are also running with fixed delay of 100ms and polling 10 messages from other queues.


Solution

  • The problem is quite simple after all.

    The process runs once every 100ms and consumes 10 messages.

    That's a top of 60 messages / second per instance if it wouldn't take any time to process those messages. It's mentioned above that 10 instances consume 30k messages every 5 minutes. That is 50 messages / second (not far away from the calculation above).

    Either increase the number of instances to 17 or better, do not run the process every 100ms and use long polling to consume as soon as there are messages available.