Search code examples
scalaapache-sparkapache-spark-sqlhivecontext

Append transformed columns to spark dataframe using scala


I am trying to access a hive table and extract and transform certain columns from the table/dataframe and then put those new columns in a new dataframe. I am trying to do it in this way -

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val hiveDF = sqlContext.sql("select * from table_x")

val system_generated_id = hiveDF("unique_key")
val application_assigned_event_id = hiveDF("event_event_id")

val trnEventDf = sqlContext.emptyDataFrame
trnEventDf.withColumn("system_generated_id",lit(system_generated_id))

It builds without any error with sbt. But when I try to run it I receive the following error -

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$14.apply(Analyzer.scala:354) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$14.apply(Analyzer.scala:353) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:353) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:347) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:347) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:328) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) at org.apache.spark.sql.DataFrame.(DataFrame.scala:133) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188) at bacon$.main(bacon.scala:31) at bacon.main(bacon.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)

I would like to understand what causes this error and if there is any other way to accomplish what I am trying to do.


Solution

  • In general you do not need to create a new df for this. When you transform the df by adding a unique Id to it, you get your desired df. If you would like to save it, simply save it as a new hive table.