Search code examples
hadoopapache-sparkhiveparquet

Generate metadata for parquet files


I have a hive table that is built on top of a load of external parquet files. Parquet files should be generated by the spark job, but due to setting metadata flag to false they were not generated. I'm wondering if it is possible to restore it in some painless way. The structure of files is like follows:

/apps/hive/warehouse/test_db.db/test_table/_SUCCESS
/apps/hive/warehouse/test_db.db/test_table/_common_metadata
/apps/hive/warehouse/test_db.db/test_table/_metadata
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-20
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-21
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-22
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-23
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-24
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-25
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-26
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-27
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-28
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-29
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-30

Let's assume that the file _metadata is non-existing or outdated. Is there a way to recreate it via hive command/generate it without having to start the whole spark job?


Solution

  • Ok so here is the drill, metadata can be accessed directly using Parquet tools. You'll need to get the footers for your parquet file first :

    import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}
    
    import org.apache.parquet.hadoop.ParquetFileReader
    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.hadoop.conf.Configuration
    
    val conf = spark.sparkContext.hadoopConfiguration
    
    def getFooters(conf: Configuration, path: String) = {
      val fs = FileSystem.get(conf)
      val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
      footers
    }
    

    Now you can get your file metadata as followed :

    def getFileMetadata(conf: Configuration, path: String) = {
      getFooters(conf, path)
        .asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala)
    }
    

    Now you can get the metadata of your parquet file :

    getFileMetadata(conf, "/tmp/foo").headOption
    
    // Option[scala.collection.mutable.Map[String,String]] =
    //   Some(Map(org.apache.spark.sql.parquet.row.metadata ->
    //     {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}}
    //     {"name":"txt","type":"string","nullable":true,"metadata":{}}]}))
    

    We can also use extracted footers to write standalone metadata file when needed:

    import org.apache.parquet.hadoop.ParquetFileWriter
    
    def createMetadata(conf: Configuration, path: String) = {
      val footers = getFooters(conf, path)
      ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers)
    }
    

    I hope this answers your question. You can read more about Spark DataFrames and Metadata on awesome-spark's spark-gotchas repo.