Search code examples
apache-sparkhiveapache-spark-sqlhadoop2tpc

how to use a whole hive database in spark and read sql queries from external files?


I am using hortonworks sandbox in Azure with spark 1.6. I have a Hive database populated with TPC-DS sample data. I want to read some SQL queries from external files and run them on the hive dataset in spark. I follow this topic Using hive database in spark which is just using a table in my dataset and also it writes SQL query in spark again, but I need to define whole, dataset as my source to query on that, I think i should use dataframes but i am not sure and do not know how! also I want to import the SQL query from external .sql file and do not write down the query again! would you please guide me how can I do this? thank you very much, bests!


Solution

  • Spark Can read data directly from Hive table. You can create, drop Hive table using Spark and even you can do all Hive hql related operations through the Spark. For this you need to use Spark HiveContext

    From the Spark documentation:

    Spark HiveContext, provides a superset of the functionality provided by the basic SQLContext. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup.

    For more information you can visit Spark Documentation

    To Avoid writing sql in code, you can use property file where you can put all your Hive query and then you can use the key in you code.

    Please see below the implementation of Spark HiveContext and use of property file in Spark Scala.

    package com.spark.hive.poc
    
    import org.apache.spark._
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql._
    import org.apache.spark._
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.rdd.RDD;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.spark.sql.hive.HiveContext;
    
    //Import Row.
    import org.apache.spark.sql.Row;
    //Import Spark SQL data types
    import org.apache.spark.sql.types.{ StructType, StructField, StringType };
    
    object ReadPropertyFiles extends Serializable {
    
      val conf = new SparkConf().setAppName("read local file");
    
      conf.set("spark.executor.memory", "100M");
      conf.setMaster("local");
    
      val sc = new SparkContext(conf)
      val sqlContext = new HiveContext(sc)
    
      def main(args: Array[String]): Unit = {
    
        var hadoopConf = new org.apache.hadoop.conf.Configuration();
        var fileSystem = FileSystem.get(hadoopConf);
        var Path = new Path(args(0));
        val inputStream = fileSystem.open(Path);
        var Properties = new java.util.Properties;
        Properties.load(inputStream);
    
        //Create an RDD
        val people = sc.textFile("/user/User1/spark_hive_poc/input/");
        //The schema is encoded in a string
        val schemaString = "name address";
    
        //Generate the schema based on the string of schema
        val schema =
          StructType(
            schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));
    
        //Convert records of the RDD (people) to Rows.
        val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim));
        //Apply the schema to the RDD.
        val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
        peopleDataFrame.printSchema();
    
        peopleDataFrame.registerTempTable("tbl_temp")
    
        val data = sqlContext.sql(Properties.getProperty("temp_table"));
    
        //Drop Hive table
        sqlContext.sql(Properties.getProperty("drop_hive_table"));
        //Create Hive table
        sqlContext.sql(Properties.getProperty("create_hive_tavle"));
        //Insert data into Hive table
        sqlContext.sql(Properties.getProperty("insert_into_hive_table"));
        //Select Data into Hive table
        sqlContext.sql(Properties.getProperty("select_from_hive")).show();
    
        sc.stop
    
      }
    }
    

    Entry in Properties File :

    temp_table=select * from tbl_temp
    drop_hive_table=DROP TABLE IF EXISTS default.test_hive_tbl
    create_hive_tavle=CREATE TABLE IF NOT EXISTS default.test_hive_tbl(name string, city string) STORED AS ORC
    insert_into_hive_table=insert overwrite table default.test_hive_tbl select * from tbl_temp
    select_from_hive=select * from default.test_hive_tbl
    

    Spark submit Command to run this job:

    [User1@hadoopdev ~]$ spark-submit --num-executors 1 \
    --executor-memory 100M --total-executor-cores 2 --master local \
    --class com.spark.hive.poc.ReadPropertyFiles Hive-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
    /user/User1/spark_hive_poc/properties/sql.properties
    

    Note: Property File location should be HDFS location.