Search code examples
spring-bootapache-kafkaamazon-ecsspring-kafka

How to handle large backlog of Kafka messages in a production environment, especially when your application is newly deployed?


I have a spring boot application freshly deployed on Production environment (on AWS) consuming Kafka messages on various topics which already have thousands of messages (~25K) produced prior to the deployment.

I have observed that the Kafka consumer logs in my application are printed only after below the Spring boot application is started, however, the container keeps restarting due to failed ECS health check after every 3 mins. We also tried increasing the health check period to 15 mins, but still it keeps restarting.

INFO [m - Started Application in 13.364 seconds (JVM running for 14.489)

Below is my Kafka topic configuration if this information helps.

  • Spring boot version: v2.6.6
  • Kafka Version: 3.0.1
  • org.mybatis.spring.boot : 2.2.2

Kafka Consumer Config as below:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [***.com:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-appmanager-1
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = appmanager
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

KafkaConsumerConfig file methods as below:

@Bean
public ConsumerFactory<String, ABCMessage> consumerFactory() {
    Map<String, Object> props = getConsumerProperties(ABCDeserializer.class, FailedABCConfigRequestMessageProvider.class, ABCMessage.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ABCMessage> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ABCMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

A sample snippet of my consumer code is as follows:

@KafkaListener(topics = "${abc.config.topic.name}", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, ABCMessage> cr) {
    logger.info("Received Message: {}", cr.value().toString());

    try {
        abcConfigProcessor.processMessage(cr.value());
    } catch (Exception e) {
        logger.error("Message cannot be processed. Error: {}", e.getMessage());
    }
}

How can I enable my application to be deployed in Production and gracefully absorbing the backlog in background and also successfully respond to periodic ECS health check?


Solution

  • As highlighted in the comments above, the issue for us actually turned out to be related to incorrect ECS health check configuration. It was a missing security group/firewall port that was not allowed. Due to this incorrect configuration, the ping API was unable to reach the application, resulting in a failed ECS health check and the container getting restarted.