Search code examples
mavenapache-kafkaavroconfluent-platform

"Error registering Avro schema" when trying to stream data to Kafka


I am trying to reproduce the Serializer example found in Confluent's official documentation and stream data in avro format to a kafka topic.

Here's the code:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);

String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                    "\"name\":\"myrecord\"," +
                    "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");

ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(Exception e) {
  e.printStackTrace();
}

But producer.send(record); causes the following error:

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register schema operation failed while writing to the Kafka store; error code: 50001
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:775)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
    at com.giorgos.currencies.TestFX.main(TestFX.java:134)

Note that I changed

ProducerRecord<Object, Object> record = new ProducerRecord<>("topic1", key, avroRecord);

to

ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);

in order to deal with diamond operator is not supported in -source 1.5 error.

Here's the content of pom.xml file:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.giorgos.currencies</groupId>
  <artifactId>giorgos-fx_currencies</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>george-fx_currencies</name>
  <url>http://maven.apache.org</url>

    <repositories>
        <repository>
            <id>apache-repo</id>
            <name>Apache Repository</name>
            <url>https://repository.apache.org/content/repositories/releases</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>


    <properties>
        <kafka.version>0.8.2.1</kafka.version>
        <kafka.scala.version>2.10</kafka.scala.version>
        <confluent.version>4.0.0</confluent.version>
        <avro.version>1.7.6</avro.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

   <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <finalName>uber-${project.artifactId}-${project.version}</finalName>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

Also note that mvn package leads to a successful build without any errors reported. topic1 is also created successfully, but no data appears in the consumer since producer's side fails to stream the data.

I use

java -cp target/uber-giorgos-fx_currencies-1.0-SNAPSHOT.jar com.giorgos.currencies.TestFX

to run the code


Solution

  • I guess your problem is that your message key is a String and not an avro object. Try the following producer properties:

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    

    and the producer record to

    ProducerRecord<String, Object> record = new ProducerRecord<String, Object>("topic1", key, avroRecord);
    

    Hope that helps