Search code examples
apache-kafkarhel

Some replica are not in sync when installing scratch Kafka cluster


we are installing new Apache Kafka - version 2.7 on Linux machines version RHEL 7.9 total Kafka machines in the cluster are - 5 machines

now installation is completed , but we noticed that not all ISR are in Sync

I want to share all the reason that maybe explain what cause replica to be not in Sync

Slow replica: A follower replica that is consistently not able to catch up with the writes on the leader for a certain period of time. One of the most common reasons for this is an I/O bottleneck on the follower replica causing it to append the copied messages at a rate slower than it can consume from the leader.

Stuck replica: A follower replica that has stopped fetching from the leader for a certain period of time. A replica could be stuck either due to a GC pause or because it has failed or died.

Bootstrapping replica: When the user increases the replication factor of the topic, the new follower replicas are out-of-sync until they are fully caught up to the leader’s log.

but since we are dealing with new scratch Kafka cluster , then I wonder if the problem with ISR that are not in sync maybe related to some parameters in Kafka server.properties that are not set as well

here is example about __consumer_offsets topic we can see many missing ISR's

Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 1003    Replicas: 1003,1001,1002        Isr: 1003,1001,1002
        Topic: __consumer_offsets       Partition: 1    Leader: 1001    Replicas: 1001,1002,1003        Isr: 1001,1003,1002
        Topic: __consumer_offsets       Partition: 2    Leader: 1003    Replicas: 1002,1003,1001        Isr: 1003,1001
        Topic: __consumer_offsets       Partition: 3    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1001
        Topic: __consumer_offsets       Partition: 4    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1001,1003
        Topic: __consumer_offsets       Partition: 5    Leader: 1001    Replicas: 1002,1001,1003        Isr: 1003,1001,1002
        Topic: __consumer_offsets       Partition: 6    Leader: 1003    Replicas: 1003,1001,1002        Isr: 1003,1001,1002
        Topic: __consumer_offsets       Partition: 7    Leader: 1001    Replicas: 1001,1002,1003        Isr: 1001,1003,1002
        Topic: __consumer_offsets       Partition: 8    Leader: 1003    Replicas: 1002,1003,1001        Isr: 1003,1001
        Topic: __consumer_offsets       Partition: 9    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1001
        Topic: __consumer_offsets       Partition: 10   Leader: 1001    Replicas: 1001,1003,1002        Isr: 1001,1003
        Topic: __consumer_offsets       Partition: 11   Leader: 1001    Replicas: 1002,1001,1003        Isr: 1003

here is example to what we have in server.properties

but after googled a while , we not found what can avoid the problem of ISR that are not in sync

auto.create.topics.enable=false
auto.leader.rebalance.enable=true
background.threads=10
log.retention.bytes=-1
log.retention.hours=12
delete.topic.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
log.dir=/var/kafka/kafka-data
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=1000
log.flush.offset.checkpoint.interval.ms=60000
log.flush.scheduler.interval.ms=9223372036854775807
log.flush.start.offset.checkpoint.interval.ms=60000
compression.type=producer
log.roll.jitter.hours=0
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000
message.max.bytes=1000012
min.insync.replicas=1
num.io.threads=8
num.network.threads=3
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=600000
offsets.retention.minutes=10080
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.topic.segment.bytes=104857600
queued.max.requests=500
quota.consumer.default=9223372036854775807
quota.producer.default=9223372036854775807
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
request.timeout.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.max.timeout.ms=900000
transaction.state.log.load.buffer.size=5242880
transaction.state.log.min.isr=2
transaction.state.log.num.partitions=50
transaction.state.log.replication.factor=3
transaction.state.log.segment.bytes=104857600
transactional.id.expiration.ms=604800000
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=600000
zookeeper.max.in.flight.requests=10
zookeeper.session.timeout.ms=600000
zookeeper.set.acl=false
broker.id.generation.enable=true
connections.max.idle.ms=600000
connections.max.reauth.ms=0
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.socket.timeout.ms=30000
default.replication.factor=2
delegation.token.expiry.time.ms=86400000
delegation.token.max.lifetime.ms=604800000
delete.records.purgatory.purge.interval.requests=1
fetch.purgatory.purge.interval.requests=1000
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=1800000
group.max.size=2147483647
group.min.session.timeout.ms=6000
log.213`1234cleaner.backoff.ms=15000
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.delete.retention.ms=86400000
log.cleaner.enable=true
log.cleaner.io.buffer.load.factor=0.9
log.cleaner.io.buffer.size=524288
log.cleaner.io.max.bytes.per.second=1.7976931348623157e308
log.cleaner.max.compaction.lag.ms=9223372036854775807
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
log.cleaner.threads=1
log.cleanup.policy=delete
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.message.timestamp.difference.max.ms=9223372036854775807
log.message.timestamp.type=CreateTime
log.preallocate=false
log.retention.check.interval.ms=300000
max.connections=2147483647
max.connections.per.ip=2147483647
max.incremental.fetch.session.cache.slots=1000
num.partitions=1
producer.purgatory.purge.interval.requests=1000
queued.max.request.bytes=-1
replica.fetch.backoff.ms=1000
replica.fetch.max.bytes=1048576
replica.fetch.response.max.bytes=10485760
reserved.broker.max.id=1500
transaction.abort.timed.out.transaction.cleanup.interval.ms=60000
transaction.remove.expired.transaction.cleanup.interval.ms=3600000
zookeeper.sync.time.ms=2000
broker.rack=/default-rack

we'll appreciate , to get suggestions to how to improve the replica to be in Sync

links

Fixing under replicated partitions in kafka

https://emilywibberley.com/blog/kafka-how-to-fix-out-of-sync-replicas/

What is a right value for replica.lag.time.max.ms?

https://strimzi.io/blog/2021/06/08/broker-tuning/

https://community.cloudera.com/t5/Support-Questions/Kafka-Replica-out-of-sync-for-over-24-hrs/m-p/82922

https://hevodata.com/learn/kafka-replication/

here are the options that we consider to do ( but only as suggestion not solution )

  1. restart Kafka brokers , each Kafka step by step

  2. remove the non in SYNC replica by rm -rf , as example rm -rf TEST_TOPIC_1 , and hope that Kafka will create this replica and as results it will be in SYNC

  3. try to use the kafka-reassign-partitions

  4. maybe ISR will be in Sync after some time ?

  5. increase replica.lag.time.max.ms to higher value as 1 day and restart the brokers

    The definition of synchronization depends on the topic configuration, but by default, this means that the replica has been or has been fully synchronized with the leader in the last 10 seconds. The settings for this time period are:replica.lag.time.max.ms, and has a server default value, which can be overridden by each topic.

What is the ISR?

The ISR is simply all the replicas of a partition that are "in-sync" with the leader. The definition of "in-sync" depends on the topic configuration, but by default, it means that a replica is or has been fully caught up with the leader in the last 10 seconds. The setting for this time period is: replica.lag.time.max.ms and has a server default which can be overridden on a per topic basis. At a minimum the, ISR will consist of the leader replica and any additional follower replicas that are also considered in-sync. Followers replicate data from the leader to themselves by sending Fetch Requests periodically, by default every 500ms. If a follower fails, then it will cease sending fetch requests and after the default, 10 seconds will be removed from the ISR. Likewise, if a follower slows down, perhaps a network related issue or constrained server resources, then as soon as it has been lagging behind the leader for more than 10 seconds it is removed from the ISR.

Some other important related parameters to be configured are:

min.insync.replicas: Specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. offsets.retention.check.interval.ms: Frequency at which to check for stale Offsets.

offsets.topic.segment.bytes: This should be kept relatively small in order to facilitate faster Log Compaction and Cache Loads. replica.lag.time.max.ms: If the follower has not consumed the Leaders log OR sent fetch requests, for at least this much time, it is removed from the ISR.

replica.fetch.wait.max.ms: Max wait time for each fetcher request issued by follower replicas, must be less than the replica.lag.time.max.ms to avoid shrinking of ISR.

transaction.max.timeout.ms: In case a client requests a timeout greater than this value, it’s not allowed so as to not stall other consumers. zookeeper.session.timeout.ms: Zookeeper session timeout.

zookeeper.sync.time.ms: How far a follower can be behind a Leader, setting this too high can result in an ISR that has potentially many out-of-sync nodes.


Solution

  • time-related settings aren't what you want; if you increase these, it just means it will take longer for Kafka to show you the problem, meanwhile your data actually gets further behind. For a brand new cluster, you should have no out of sync ISR until you start adding load...

    Increasing num.replica.fetchers and num.network.threads will allow the brokers to read replicas over the network faster. At most, you can try setting these to the number of CPU cores on the machines.

    Smaller segment bytes can be used to increase replication, but it's better to set that on a per-topic basis, for compaction only, not to adjust replication cluster-wide.