Search code examples
apache-kafkaavroapache-kafka-streamsspring-cloud-streamconfluent-schema-registry

Incompatible Avro messages between Spring Cloud Stream Kafka Stream and native Kafka Stream applications and producers


Sample applications to verify this can be found in https://github.com/codependent/event-carried-state-transfer/tree/avro

  • kafka-xxx: native applications
  • spring-boot-xxx: Spring Cloud Stream applications

The problem is Avro messages produced by a native Kafka Producer can't be unmarshalled by Spring Cloud Stream Applications e.g:

Native Kafka Producer (kafka-customer-service project)

@Component
class CustomerProducer {

    private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ProducerConfig.CLIENT_ID_CONFIG] = "kafka-customer-producer"
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = IntegerSerializer::class.java.name
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }


    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
}

Spring Cloud Stream Kafka Stream (spring-boot-shipping-service)

    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<Int, Customer>, ...): KStream<Int, OrderShippedEvent> {

        val serdeConfig = mapOf(
                AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = SpecificAvroSerde<Customer>()
        customerSerde.configure(serdeConfig, false)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)

        ...

In this case, the Spring Cloud Stream application unmarshals an empty Customer DTO: {"id": 0, "name": "", "address": ""}

Now trying the other way around, a Spring Cloud Stream Producer and a native Kafka Streams application:

Spring Cloud Stream Kafka Producer (spring-boot-customer-service)

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
      bindings:
        output:
          destination: customer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081

---

@Service
class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
   ...
   val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.id).build()
   customerKafkaProducer.output().send(message)
   ...

Native Kafka Stream (kafka-shipping-service)

    val builder = StreamsBuilder()

    val streamsConfiguration = Properties()
    streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
    //streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray()::class.java.name)
    //streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
    streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
    streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"

    val serdeConfig = mapOf(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
        AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
    )

    //val byteArraySerde = Serdes.ByteArray()
    val intSerde = Serdes.IntegerSerde()
    val customerSerde = SpecificAvroSerde<Customer>()
    customerSerde.configure(serdeConfig, false)

    val customerStream = builder.stream<Int, Customer>("customer",
        Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>

    val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
        Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
            .withKeySerde(intSerde)
            .withValueSerde(customerSerde)

    val customerTable = customerStream
        .map { key, value -> KeyValue(key, value) }
        .groupByKey(Serialized.with(intSerde, customerSerde))
        .reduce({ _, y -> y }, stateStore)

In this case the native application directly crashes with an exception (org.apache.kafka.common.errors.SerializationException: Unknown magic byte!)

Exception in thread "kafka-shipping-service-b89157ba-b21f-46ba-911d-97f6080d477e-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Disconnected from the target VM, address: '127.0.0.1:57856', transport: 'socket'

Process finished with exit code 0

How can I ensure the compatibility of messages generated by Spring Cloud Stream producers / Native Kafka producers in an heterogeneous corporate environment where there will be consumers which could be indistinctively Spring Cloud Stream Katfka Stream applications and Native Kafka Streams?


Solution

  • @codependent With your first case - you have a native Kafka producer that uses KafkaAvroSerializer and Spring Cloud Stream Kafka Streams consumer that uses avro deserializers provided by Spring Cloud Stream. That won't work as you are using non-compatible serializers/deserializers. In order to fix this, on the Spring Cloud Stream side you need to enable useNativeDecoding and provide avro Serde's (SpecificAvroSerde). That way you are using the same serialization/deserialization strategy across.

    With your second case, you are getting the classic error (Unknown magic byte!) when serializers don't match. Again the same problem. You have a Spring Cloud Stream producer that uses the serializers from the framework, but on the consuming side using SpecificAvroSerde. In order to fix here, you can turn on useNativeEncoding on the producer side and let the avro serializer be used. Or wrap the Avro serializer from Spring Cloud Stream in a Serde and provide that on the consumer.

    I think the bottom line here is that, when using avro as your data exchange format, you need to make sure that the same serialization/deserialization strategies are used across your micro services chain that rely on this data.