Search code examples
scalaapache-kafkaavro

Kafka: Error serializing Avro message with Schema Registry


I'm trying to send ProducerRecords of my custom type to Kafka, but I'm getting the error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

I set up schema in Schema: GET

http://localhost:8081/subjects/documentCreations-key/versions/3

Response:

{
"subject": "documentCreations-key",
"version": 3,
"id": 1,
"schema": "\"string\""}

GET

http://localhost:8081/subjects/documentCreations-value/versions/4

Response

{
"subject": "documentCreations-value",
"version": 4,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"Document\",\"namespace\":\"com.bade\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"}]}"

}

Here is my Scala class:

class Document(val name: java.lang.String,
               val title: java.lang.String,
               val path: java.lang.String)

And the part with KafkaProducer:

class MyKafkaProducer {

  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("schema.registry.url", "http://localhost:8081")

  private val producer = new KafkaProducer[java.lang.String, Document](props)

  def sendCreateDocumentMessage(document: Document): RecordMetadata = {

    val documentRecord = new ProducerRecord[java.lang.String, Document](SharedConfig
      .documentCreationsTopic,
      document.name, document)

    producer.send(documentRecord).get()
  }

What am I missing? I see that I can implement SpecificRecord for my class, but I didn't see that as necessary in book/tutorials that I've been reading. Thanks!

EDITED: Fixed class name


Solution

  • Answering my own question. Apparently, (de)serialization is not done automatically (via reflection or something), but you have to generate the class from avro schema file. Posting my pom.xml if it will be helpful to someone:

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
    
        <plugins>
    
            <!--force java 8-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
    
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.1</version>
            </plugin>
    
            <plugin>
                <!-- Build an executable JAR -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.0.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
    
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>src/main/avro
                            </sourceDirectory>
                        </configuration>
    
                    </execution>
                </executions>
            </plugin>
            <!--force discovery of generated classes-->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>target/generated-sources/avro</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    
        </plugins>
    </build>
    
    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
    
    <properties>
        <kafka.version>1.0.0</kafka.version>
        <confluent.version>4.0.0</confluent.version>
        <avro.version>1.8.2</avro.version>
    </properties>
    
    <dependencies>
    
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>${avro.version}</version>
        </dependency>
    
    </dependencies>
    

    I build it with following mvn commands:

    mvn clean:clean avro:schema compiler:compile scala:compile jar:jar