Search code examples
scalaapache-sparkdatabricksavroconfluent-schema-registry

Databricks to_avro works only if schema is registered without specified event name and namespace


I'm using databricks runtime 10.0 with spark 3.2.0 and scala 2.12. I also have a dependency on io.confluent:kafka-schema-registry-client:6.2.0, from which I use CachedSchemaRegistryClient to register schemas in schema registry like this:


import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.avro.AvroSchema

@transient lazy val reg = new CachedSchemaRegistryClient(schemaRegistryUrl, schemaRegistryCacheCapacity, null)

  def registerSchema(name: String, schema: Schema): Int = {
    reg.register(name, new AvroSchema(schema))
  }

Now, this works in Spark as expected:

val dataframe = ...
val schema = toAvroType(dataframe.schema)
schemaRegistry.registerSchema("some_name", schema)

          

display(dataframe
       .select(struct(/*some fields*/).alias("body"))
       .select(to_avro('body, lit("some_name"), schemaRegistryUrl).as("body")))

And I'm also able to deserialize. Now, as soon as I make the following change to specify correct schema name and namespace:

val schema = toAvroType(dataframe.schema, true, "some_name", "com.some.namespace")

Spark fails with

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 853.0 failed 4 times, most recent failure: Lost task 2.3 in stage 853.0 (TID 21433) (10.206.5.9 executor driver): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:272)

Even though in both cases, I'm able to see schema correctly registered in schema registry. Any idea why the second scenario doesn't work?


Solution

  • From https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html:

    "If the default output schema of to_avro matches the schema of the target subject... Otherwise, you must provide the schema of the target subject in the to_avro function:"

     to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
    

    So to_avro with 3 argument works because converted output Avro schema is of record type, the record name is topLevelRecord and there is no namespace by default.

    With the specified name and namespace it no longer matches and so additionally schema itself need to be specified.