Search code examples
javaspringspring-bootapache-kafkareactive-kafka

Issue with Kafka Broker - UnknownServerException


Our application uses springBootVersion = 2.0.4.RELEASE along with compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE') dependency.

The Kafka Broker that we have is at version 1.0.1.

Intermittently when we send the messages onto Kafka by creating reactor.kafka.sender.SenderRecord and in response of Kafka when look for reactor.kafka.sender.SenderResult.exception() we have

java.lang.RuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request populated in the exception.

Upon retrying couple of times, the messages get through successfully.

On the broker logs the below error is being printed multiple times without any stacktrace

[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)

where price-promotions-local-event is our topic.

I have looked online but there is no definitive resolution or ways to triage this issue, many thanks in advance for any help.


Solution

  • On further investigating we could get the stacktrace on broker logs as

    ERROR [ReplicaManager broker=1] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)
    java.lang.IllegalArgumentException: Magic v1 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:442)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:595)
        at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$2(LogValidator.scala:138)
        at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$2$adapted(LogValidator.scala:136)
        at scala.collection.Iterator.foreach(Iterator.scala:929)
        at scala.collection.Iterator.foreach$(Iterator.scala:929)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
        at scala.collection.IterableLike.foreach(IterableLike.scala:71)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$1(LogValidator.scala:136)
        at kafka.log.LogValidator$.$anonfun$convertAndAssignOffsetsNonCompressed$1$adapted(LogValidator.scala:133)
        at scala.collection.Iterator.foreach(Iterator.scala:929)
        at scala.collection.Iterator.foreach$(Iterator.scala:929)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
        at scala.collection.IterableLike.foreach(IterableLike.scala:71)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.log.LogValidator$.convertAndAssignOffsetsNonCompressed(LogValidator.scala:133)
        at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:64)
        at kafka.log.Log.liftedTree1$1(Log.scala:654)
        at kafka.log.Log.$anonfun$append$2(Log.scala:642)
        at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
        at kafka.log.Log.append(Log.scala:624)
        at kafka.log.Log.appendAsLeader(Log.scala:597)
        at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:499)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:223)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:487)
        at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:724)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
        at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:138)
        at scala.collection.TraversableLike.map(TraversableLike.scala:234)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:708)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:459)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:465)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
        at java.lang.Thread.run(Thread.java:748)
    

    From the class file MemoryRecordsBuilder available in org.apache.kafka:kafka-clients:1.0.2 we have the below from where this IllegalArgumentException is being thrown.

    
    if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
      throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
    
    

    So, there were headers being set in the ProducerRecord which was causing the issue, upon printing the ProducerRecord we discovered that the headers were added by AppDynamics -- a “singularityheader” is getting added to Kafka Produced record.

    c.t.a.p.i.m.i.KafkaProducerInterceptor   : The kafka Interceptor ProducerRecord header:: RecordHeader(key = singularityheader, value = [110, 111, 116, 120, 100, 101, 116, 101, 99, 116, 61, 116, 114, 117, 101, 42, 99, 116, 114, 108, 103, 117, 105, 100, 61, 49, 53, 53, 49, 51, 55, 51, 54, 57, 49, 42, 97, 112, 112, 73, 100, 61, 55, 49, 48, 51, 50, 42, 110, 111, 100, 101, 105, 100, 61, 49, 51, 53, 55, 53, 51, 53])
    

    more reading https://developer.ibm.com/messaging/2018/07/10/additional-rfh-header-added-appdynamics-monitor-agent-tool/

    so we have explicitly set the headers to null in the interceptor and this has resolved the issue.