I created a project on Apache Spark.
Version:
I need to save a simple DataFrame in an HBase table. For this I started HBase 1.2.0 in Docker container (https://github.com/zhao-y/docker-hbase-pseudo) and created the following table:
$ hbase(main):002:0> create "table1", "cf1", "cf2", "cf3", "cf4", "cf5", "cf6", "cf7", "cf8"
$ 0 row (s) in 1.4440 seconds
To save a DataFrame in Hbase I use: https://github.com/hortonworks-spark/shc
dataFrame.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
Code:
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test
class SparkTest {
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
@Test
def bar(): Unit = {
val sparkSession = SparkSession.builder
.appName("SparkTest")
.master("local[*]")
.config("spark.testing.memory", 2147480000)
.getOrCreate()
val data = (0 to 255).map { i => HBaseRecord(i, "extra") }
val dataFrame = sparkSession.createDataFrame(data)
dataFrame.show
dataFrame.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
}
Error:
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/TableDescriptor
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:63)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at SparkTest.bar(SparkTest.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.TableDescriptor
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 41 more
val sparkSession = SparkSession.builder
.appName("SparkTest")
.master("local[*]")
.config("spark.testing.memory", 2147480000)
.getOrCreate()
means you are running that in local and your hbase client jar is missing. (if its there in classpath then you can change the scope to runtime
rather than compile
)
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.4</version>
</dependency>
if you are using intellij to run locally, you can see hbase client jar is present in the .iml file.
normal way of runnning in cluster or client modes(not local) would be hbase claasspath
add it to
export HBASE_CLASSPATH=$HBASE_CLASSPATH:`hbase classpath`
which will add all the hbase jars in to the classpath
to see/print all the jars in classpath below will be helpful to understand which jars in your classpath.
def urlsinclasspath(cl: ClassLoader): Array[java.net.URL] = cl match {
case null => Array()
case u: java.net.URLClassLoader => u.getURLs() ++ urlsinclasspath(cl.getParent)
case _ => urlsinclasspath(cl.getParent)
}
Caller would be...
val urls = urlsinclasspath(getClass.getClassLoader).foreach(println)