Search code examples
apache-kafkaclouderaflumespring-kafka

Spring Kafka - Encountering "Magic v0 does not support record headers" error


I'm running a Spring Boot application and have buckled in compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')

I'm trying to go against a Cloudera installation with this version:

Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50

My KafkaProducerConfig class is pretty straightforward:

@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;

@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;

@Value("${spring.kafka.producer.client-id}")
private String producerClientId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

    return props;
}

@Bean
public ProducerFactory<String, Pdid> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
    KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());

    kafkaTemplate.setDefaultTopic(this.defaultTopicName);

    return kafkaTemplate;
}

@PostConstruct
public void postConstruct() {
    LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
    LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}

}

When I start up the application, I receive:

2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e

Then, I send in a payload. It shows up in Kafka Tool against the topic. But, in the logs on the Kafka side when the data is trying to be ingested, I receive:

[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)

I've tried the following things from the Producer application side:

  1. Downgrading to Spring Kafka 2.0.4. I was hoping that coming down to the Kafka version of 0.11.0 would help get rid of the problem, but it had no effect.
  2. Verified that the nodes are all the same version. According to my admin, they are.
  3. Verified with my admin that we don't have a mixed installation. Again, I was told that we do not.
  4. Based on a similar Stack Overflow question, I came back up to 2.1.5 and attempted to put in JsonSerializer.ADD_TYPE_INFO_HEADERS to false. I thought perhaps it would remove the headers that the log was referring to. Again, no go and the same error was logged.

I'm hoping I'm missing something obvious. Are there any additional headers I need to turn on/off to help resolve the Magic v0 issue that anyone is aware of?

We have other applications which write to other topics within the same environment successfully, but they are older applications that are hand-crafting the necessary Spring beans. Additionally, those applications are also using a much older client (0.8.2.2) and they are using a StringSerializer for the Producer value instead of JSON. I need my data to be in JSON and I really don't want to downgrade to 0.8.2.2 when we are on a system that should support 0.11.x.


Solution

  • but they are older applications that are hand-crafting the necessary Spring beans.

    at org.apache.kafka.common.record.FileRecords. downConvert (FileRecords.java:245)

    I am not familiar with kafka broker internals, but it "sounds" like the topics were created with an old broker and their formats don't support headers, rather than the broker version itself (hint: downConvert).

    Have you tried this with a clean broker?

    The 1.0.x client can talk to older brokers (back to 0.10.2.x IIRC) as long as you don't try to use features that are not supported by the broker. The fact that your broker is 0.11 (which does support headers) is a further indication that it's the topic record format that's the issue.

    I have successfully tested up/down broker/client/topic versions without problems, as long as you use the common feature subset.

    JsonSerializer.ADD_TYPE_INFO_HEADERS to false.

    That should prevent the framework from setting any headers; you need to show your producer code (and all the configuration).

    You could also add a ProducerInterceptor to the producer config and examine the ProducerRecord headers property in the onSend() method, to figure out what's setting headers in the output message.

    If you are using spring-messaging messages (template.setn(Message<?> m), headers will be mapped by default). Using raw template.send() methods will not set any headers (unless you send a ProducerRecord with headers populated.