Search code examples
node.jsapache-kafkaconfluent-platformconfluent-cloudkafkajs

How to stop receiving kafka-metadata messages on a topic


For some reason, sometimes, our Confluent Kafka consumer receives wierd messages on a specific topic:

{
  "magicByte": 2,
  "attributes": 0,
  "timestamp": "1623227829187",
  "offset": "11575814",
  "key": null,
  "value": {
    "type": "Buffer",
    "data": []
  },
  "headers": {},
  "isControlRecord": false,
  "batchContext": {
    "firstOffset": "11575814",
    "firstTimestamp": "1623227829187",
    "partitionLeaderEpoch": 5,
    "inTransaction": false,
    "isControlBatch": false,
    "lastOffsetDelta": 0,
    "producerId": "-1",
    "producerEpoch": -1,
    "firstSequence": -1,
    "maxTimestamp": "1623227829187",
    "timestampType": 0,
    "magicByte": 2
  }
}

I have consumed this exact same message multiple times even though I swallow all the errors and commit the offset even after failure.

It happends only on one specific topic. We deleted and re-created this topic from scarth and it didn't fix this issue.

In the following screenshot, we see that a message is empty while the other messages are not empty. What does it mean?

enter image description here


Producer - C#:

_producerBuilder = new ProducerBuilder<Null, string>(new ProducerConfig
                {
                    BootstrapServers = _kafka.BootstrapServers,
                    ClientId = Dns.GetHostName(),
                    LingerMs = _kafka.LingerMsKafka,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = _kafka.SaslUsername,
                    SaslPassword = _kafka.SaslPassword,
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    EnableSslCertificateVerification = false
                }).Build();

Consumer NodeJs:

this.kafka = new Kafka.Kafka({
  brokers: [`${connectionOptions.host}:${connectionOptions.port}`],
  logLevel: Kafka.logLevel.NOTHING,
  ssl: !!connectionOptions.sasl,
  sasl: connectionOptions.sasl,
})
// ...
this.consumer = this.kafka.consumer({
  groupId: this.groupId,
})
// ...
await this.consumer.connect()
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true })
await this.consumer.run({ batch }: EachBatchPayload) => {/* ...*/})

We are failing to consume the problematic message with ccloud: error - "panic: runtime error: index out of range [0] with length 0"

We are pretty lost. Any help will be great!


NodeJS: v14.15.4 KafkaJS: 1.16.0-beta.18


Solution

  • This issue happen on kafka confluent (dev).

    The support team did not solve this issue.

    By moving to https://aiven.io/, this issue was never reproduced again.