Search code examples
javaazureavroazure-eventhub

Azure Event Hub Avro Schema registry giving error in Java


I have written below gradle build script for Generating the Avro Schema from .avse file.

Here is the .avse file.

{
  "namespace": "com.azure.schemaregistry.samples",
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": "double"
    }
  ]
}

Build Script :

import org.apache.avro.tool.SpecificCompilerTool

buildscript {
    dependencies {
        // Add the Avro code generation to the build dependencies so that it can be used in a Gradle task.
        classpath group: 'org.apache.avro', name: 'avro-tools', version: '1.11.1'
    }
}

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.1.8'
    id 'io.spring.dependency-management' version '1.1.4'
}

def avroSchemasDir = "src/main/resources/avro"
def avroCodeGenerationDir = "build/avro"

group = 'com.eventhub'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    implementation group: 'com.microsoft.azure', name: 'msal4j', version: '1.14.2'
    implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
    implementation group: 'com.azure', name: 'azure-data-schemaregistry', version: '1.4.2'
    implementation group: 'com.azure', name: 'azure-identity', version: '1.10.4'

    implementation group: 'com.azure', name: 'azure-data-schemaregistry-avro', version: '1.0.0-beta.5'
    implementation "org.apache.avro:avro:1.11.0"
}

tasks.named('bootBuildImage') {
    builder = 'paketobuildpacks/builder-jammy-base:latest'
}

tasks.register('avroCodeGeneration') {
    // Define the task inputs and outputs for the Gradle up-to-date checks.
    inputs.dir(avroSchemasDir)
    outputs.dir(avroCodeGenerationDir)
    // The Avro code generation logs to the standard streams. Redirect the standard streams to the Gradle log.
    logging.captureStandardOutput(LogLevel.INFO);
    logging.captureStandardError(LogLevel.ERROR)
    doLast {
        // Run the Avro code generation.
        new SpecificCompilerTool().run(System.in, System.out, System.err, List.of(
                "-encoding", "UTF-8",
                "-string",
                "-fieldVisibility", "private",
                "-noSetters",
                "schema", "$projectDir/$avroSchemasDir".toString(), "$projectDir/$avroCodeGenerationDir".toString()
        ))
    }
}

tasks.withType(JavaCompile).configureEach {
    // Make Java compilation tasks depend on the Avro code generation task.
    dependsOn('avroCodeGeneration')
}

And here is the Java Main class to Deserialise the schema :

public class Main {

    public static void main(String[] args) throws IOException {
        Order order = deserialize();
        System.out.println(order.getAmount());
    }

    public static Order deserialize() throws IOException {
        TokenCredential tokenCredential = new ClientSecretCredentialBuilder()
                .tenantId("tenant")
                .clientId("id")
                .clientSecret("secret")
                .build();

        SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
                .fullyQualifiedNamespace("https://test.servicebus.windows.net")
                .credential(tokenCredential)
                .buildAsyncClient();
        SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
                .schemaRegistryAsyncClient(schemaRegistryAsyncClient)
                .schemaGroup("test-group")
                .avroSpecificReader(true)
                .autoRegisterSchema(true)
                .buildSerializer();
        InputStream inputStream = getDataToDeserialize();

        return schemaRegistryAvroSerializer.deserialize(inputStream, TypeReference.createInstance(Order.class));
    }

    static InputStream getDataToDeserialize() throws IOException {
        Order order = new Order("10", 100.0);
        System.out.println("OUTPUT : " + order.getSchema().toString());
        return new ByteArrayInputStream(order.toByteBuffer().array());
    }
}

I am getting below error :

Exception in thread "main" java.lang.IllegalStateException: Illegal format: unsupported record format indicator in payload
    at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.lambda$deserializeAsync$2(SchemaRegistryAvroSerializer.java:79)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4480)
    at reactor.core.publisher.Mono.block(Mono.java:1711)
    at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.deserialize(SchemaRegistryAvroSerializer.java:50)

I am not sure what is wrong in the above code as I have taken example from here

Please let me know if there is something wrong in the code.


Solution

  • Exception in thread "main" java.lang.IllegalStateException: Illegal format: unsupported record format indicator in payload.

    Data being deserialized matches the Avro schema expected by the deserializer. Mismatched schemas can cause deserialization errors.

    • If the data being deserialized was serialized using an older version of the schema, Check that the deserializer can handle schema evolution properly. Avro supports schema evolution, but you need to check deserializer is configured to handle schema changes gracefully.

    Here is the Java code to deserialize the serialized data using the Azure Schema Registry Avro Serializer

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.nio.ByteBuffer;
    
    import com.azure.data.schemaregistry.SchemaRegistryAsyncClient;
    import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer;
    import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializerBuilder;
    import com.azure.identity.ClientSecretCredentialBuilder;
    import com.azure.identity.TokenCredential;
    import com.fasterxml.jackson.databind.type.TypeReference;
    
    public class AvroSerializationExample {
    
        public static void main(String[] args) throws IOException {
            // Sample data for serialization
            Order order = new Order("123", 500.0);
    
            // Serialize the data
            byte[] serializedData = serialize(order);
    
            // Deserialize the data
            Order deserializedOrder = deserialize(serializedData);
    
            // Display deserialized data
            System.out.println("Deserialized Order ID: " + deserializedOrder.getId());
            System.out.println("Deserialized Order Amount: " + deserializedOrder.getAmount());
        }
    
        public static byte[] serialize(Order order) throws IOException {
            // Initialize Azure Schema Registry Avro Serializer
            // Configure with Azure credentials and schema registry details
            SchemaRegistryAvroSerializer serializer = new SchemaRegistryAvroSerializerBuilder()
                    .schemaRegistryAsyncClient(/* Initialize your Schema Registry client */)
                    .schemaGroup("test-group")
                    .avroSpecificReader(true)
                    .autoRegisterSchema(true)
                    .buildSerializer();
    
            // Convert Order object to byte array using Avro serialization
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            serializer.serialize(outputStream, order, TypeReference.createInstance(Order.class));
            return outputStream.toByteArray();
        }
    
        public static Order deserialize(byte[] data) throws IOException {
            // Initialize Azure Schema Registry Avro Serializer for deserialization
            // Configure with Azure credentials and schema registry details
            SchemaRegistryAvroSerializer deserializer = new SchemaRegistryAvroSerializerBuilder()
                    .schemaRegistryAsyncClient(/* Initialize your Schema Registry client */)
                    .schemaGroup("test-group")
                    .avroSpecificReader(true)
                    .autoRegisterSchema(true)
                    .buildSerializer();
    
            // Deserialize byte array to Order object using Avro deserialization
            InputStream inputStream = new ByteArrayInputStream(data);
            return deserializer.deserialize(inputStream, TypeReference.createInstance(Order.class));
        }
    }
    
    // Define the Order class (or your custom data model) to be serialized/deserialized
    class Order {
        private String id;
        private double amount;
    
        public Order(String id, double amount) {
            this.id = id;
            this.amount = amount;
        }
    
        // Getters and setters
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public double getAmount() {
            return amount;
        }
    
        public void setAmount(double amount) {
            this.amount = amount;
        }
    }
    

    Deserialization: enter image description here

    Events: enter image description here