Search code examples
springspring-bootapache-kafkadocker-composespring-kafka

Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)


I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/

docker-compose up works fine, creating topics via shell works fine.

Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE

When starting up the Spring application it prints the correct version of Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

I try to send a message like this

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Sending on client side fails with

UnknownServerException: The server experienced an unexpected error when processing the request

In the server console I get the message Magic v1 does not support record headers

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0 is in the classpath).

Any clues? Thanks!

Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

Solution

  • Solved. The problem is neither the broker, some docker cache nor the Spring app.

    The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...

    It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

    A "new" consumer with --bootstrap-server option should be used (especially when using Kafka 1.0 with JsonSerializer). Note: Using an old consumer here can indeed affect the producer.