Search code examples
hiveapache-sparkscala-2.10apache-spark-sql

How to enable SQL on SchemaRDD via the JDBC interface? (is it even possible ?)


UPDATING the problem statement

We are using spark 1.2.0 (Hadoop 2.4). We have defined SchemaRDDs using data files in HDFS and would like to enable querying these as tables via HiveServer2. We are encountering runtime exceptions while trying to saveAsTable and would like guidance on how to proceed.

Source code:

package foo.bar

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.hive._

object HiveDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Demo")
    val sc = new SparkContext(conf)

    // sc is an existing SparkContext.
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

    // Create an RDD
    val zipRDD = sc.textFile("/model-inputs/all_zip_state.csv")

    // The schema is encoded in a string
    val schemaString = "ODSMEMBERID,ZIPCODE,STATE,TEST_SUPPLIERID,ratio_death_readm_low,ratio_death_readm_high,regions"

    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))

    // Convert records of the RDD (zip) to Rows.
    val rowRDD = zipRDD.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), ""))

    // Apply the schema to the RDD.
    val zipSchemaRDD = hiveContext.applySchema(rowRDD, schema)

    // HiveContext's save as Table
    zipSchemaRDD.saveAsTable("allzipstable")

  }
}

spark-submit Command:

./bin/spark-submit  --class foo.bar.HiveDemo --master yarn-cluster --jars /usr/lib/hive/lib/hive-metastore.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 lib/datapipe_2.10-1.0.jar 10

Exception at runtime on Node:

15/01/29 22:35:50 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:36)
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook

Another attempt:

package foo.bar

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql._

case class AllZips(
  ODSMEMBERID: String,
  ZIPCODE: String,
  STATE: String,
  TEST_SUPPLIERID: String,
  ratio_death_readm_low: String,
  ratio_death_readm_high: String,
  regions: String)

object HiveDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HiveDemo")
    val sc = new SparkContext(conf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    import hiveContext._
    val allZips = sc.textFile("/model-inputs/all_zip_state.csv").map(_.split(",")).map(p => AllZips(p(0), p(1), p(2), p(3), p(4), p(5), ""))
    val allZipsSchemaRDD = createSchemaRDD(allZips)
    allZipsSchemaRDD.saveAsTable("allzipstable")
  }
}

Exception on node:

15/01/30 00:28:19 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36
)
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:
'CreateTableAsSelect None, allzipstable, false, None
 LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126)
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108)
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:22)
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook

Solution

  • createSchemaRDD code snippet from above works fine on spark 1.2.1

    There was a CTAS defect in 1.2.0