Search code examples
apache-sparkavrospark-avro

How to read Avro schema from empty RDD?


I'm using the AvroKeyInputFormat to read avro files:

val records = sc.newAPIHadoopFile[AvroKey[T], NullWritable, AvroKeyInputFormat[T]](path)
  .map(_._1.datum())

Because I need to reflect over the schema in my job, I get the Avro schema like this:

val schema = records.first.getSchema

Unfortunately, this fails if the avro files in path are empty (they include the writer schema, but no records).

Is there an easy way to only load the avro schema with Spark even if there are no records?


Solution

  • I've found a solution (inspired by com.databricks.spark.avro.DefaultSource):

    /**
      * Loads a schema from avro files in `directory`. This method also works if none
      * of the avro files contain any records.
      */
    def schema(directory: String)(implicit sc: SparkContext): Schema = {
      val fs = FileSystem.get(new URI(directory), sc.hadoopConfiguration)
      val it = fs.listFiles(new Path(directory), false)
    
      var avroFile: Option[FileStatus] = None
    
      while (it.hasNext && avroFile.isEmpty) {
        val fileStatus = it.next()
    
        if (fileStatus.isFile && fileStatus.getPath.getName.endsWith(".avro")) {
          avroFile = Some(fileStatus)
        }
      }
    
      avroFile.fold {
        throw new Exception(s"No avro files found in $directory")
      } { file =>
        val in = new FsInput(file.getPath, sc.hadoopConfiguration)
        try {
          val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
          try {
            reader.getSchema
          } finally {
            reader.close()
          }
        } finally {
          in.close()
        }
      }
    }