I'm using the AvroKeyInputFormat
to read avro files:
val records = sc.newAPIHadoopFile[AvroKey[T], NullWritable, AvroKeyInputFormat[T]](path)
.map(_._1.datum())
Because I need to reflect over the schema in my job, I get the Avro schema like this:
val schema = records.first.getSchema
Unfortunately, this fails if the avro files in path
are empty (they include the writer schema, but no records).
Is there an easy way to only load the avro schema with Spark even if there are no records?
I've found a solution (inspired by com.databricks.spark.avro.DefaultSource
):
/**
* Loads a schema from avro files in `directory`. This method also works if none
* of the avro files contain any records.
*/
def schema(directory: String)(implicit sc: SparkContext): Schema = {
val fs = FileSystem.get(new URI(directory), sc.hadoopConfiguration)
val it = fs.listFiles(new Path(directory), false)
var avroFile: Option[FileStatus] = None
while (it.hasNext && avroFile.isEmpty) {
val fileStatus = it.next()
if (fileStatus.isFile && fileStatus.getPath.getName.endsWith(".avro")) {
avroFile = Some(fileStatus)
}
}
avroFile.fold {
throw new Exception(s"No avro files found in $directory")
} { file =>
val in = new FsInput(file.getPath, sc.hadoopConfiguration)
try {
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
try {
reader.getSchema
} finally {
reader.close()
}
} finally {
in.close()
}
}
}