I'm running a Kafka service with replication factor 2
, the producers are requiring 1
ack on write.
On the consumer side we're not using consumer groups, all the consumers are connecting to explicitly configured partition numbers.
We have observed many times that when we do a rolling restart of the Kafka cluster, at the moment when a Kafka server goes down the consumers which are consuming the partitions of which that Kafka server was the leader receive corrupted messages for a few milliseconds.
I have not identified a pattern in the corruption, it looks like certain parts of the messages just get replaced with random values. Even fields which are supposed to always be ascii characters end up with non-ascii byte values.
During all other times while Kafka isn't being restarted, the consumers never receive any of these corrupted messages, this only happens when Kafka gets restarted.
On the producer and consumer side we're using the segment-io Kafka client: https://github.com/segmentio/kafka-go
I'm currently not sure yet if this is a Kafka problem of a problem in the segment-io Kafka client.
On the producer side we use segmentio/kafka-go
's Writer.WriteMessages()
to publish messages to Kafka, this method takes a context to cancel the operation asynchronously and the Kafka messages to publish:
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error
We pool and re-use the byte slices in the given messages after a message has been published to reduce the load on Go's GC, which usually works fine, except in a scenario where .WriteMessages()
gets passed a context that gets canceled before the passed messages have been written successfully.
We pass a context with a timeout of 2s
into .WriteMessages()
, this is usually no problem because it never takes 2s
until the messages have been written to Kafka, except in a situation where one of the Kafkas gets shut down and the writer needs to switch to the other replica serving the affected partitions. In this situation the context timed out and .WriteMessages()
returned with an error, then we re-used the byte slices of the messages that we previously passed into it to build the next batch of messages but at this moment the writer still had these messages in its internal queue and wrote them later while/after we already re-used them.
I have submitted a PR to the segmentio/kafka-go
project to clarify this in a comment: