Search code examples
javaapache-kafkaspring-cloud-streamspring-kafkaconfluent-schema-registry

Spring Cloud Stream Kafka with Confluent is not producing same message as Spring Kafka with Confluent


I would like to use Spring Cloud Stream Kafka for my Java/Spring service and I need to produce Confluent serialized messages as I have .NET and NodeJS clients that use Confluent APIs to consume my messages.

So far as we can see Spring Kafka with the Confluent serializer is working for us while Spring Cloud Stream Kafka with the Confluent serializer is giving us problems.

To demonstrate where I can see a difference in the 2 cases I have created 2 example repositories on GitHub containing only the code needed to produce a simple message in both cases.

  1. With Spring Kakfa and Confluent https://github.com/donalthurley/springKafkaAvro

  2. With Spring Cloud Stream Kafka and Confluent https://github.com/donalthurley/springCloudKafkaAvro

I think I have configured the config settings with the useNativeEncoding flag and the confluent serializer configuration correctly for the Spring Cloud application and these can be seen in the source code here https://github.com/donalthurley/springCloudKafkaAvro/blob/master/src/main/resources/application.yaml#L8

      kafka:
        binder:
          useNativeEncoding: true
          brokers: 127.0.0.1:9092
        bindings:
          output:
            producer:
              configuration:
                schema.registry.url: http://127.0.0.1:8081
                key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

I send the same simple message from my Spring Kafka application and from my Spring Cloud Stream Kafka application, the logs show.

Producing Kafka person event: {"lastName": "Doe", "firstName": "John"}

When I use the Kafka Topics UI browser from my docker Kafka environment, see https://hub.docker.com/r/landoop/fast-data-dev/, and view the message raw data it is different in both cases.

It looks more correct for Spring Kafka as the browser recognises and displays the fields inside the message value.

[
  {
    "topic": "test_spring_kafka",
    "key": "3197449393600061094",
    "value": {
      "lastName": "Doe",
      "firstName": "John"
    },
    "partition": 0,
    "offset": 0
  }
]

In the Spring Cloud Stream Kafka raw data the browser fails to recognise the fields which shows that the messages are not the same.

[
  {
    "topic": "test_spring_cloud_kafka",
    "key": "-6214497758709596999",
    "value": "\u0006Doe\bJohn",
    "partition": 0,
    "offset": 0
  }
]

I think there may be an issue producing the Confluent messages using Spring Cloud Stream Kafka and that the Spring Kafka implementation is producing them correctly but maybe I am missing something in my implementation and some one could help me with this problem?


Solution

  • The problem is with the way you configure useNativeEncoding. It was not taking into effect. This configuration should work:

    spring:
      application:
        name: springCloudKafkaAvro
      cloud:
        stream:
          schemaRegistryClient:
            endpoint: http://127.0.0.1:8081
          kafka:
            binder:
                brokers: 127.0.0.1:9092
            bindings:
              output:
                producer:
                  configuration:
                    schema.registry.url: http://127.0.0.1:8081
                    key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                    value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          bindings:
            output:
              destination: test_spring_cloud_kafka
              producer:
                useNativeEncoding: true
    
    

    Notice how useNativeEncoding is rearranged from your original config.