Search code examples
scalaapache-sparkapache-spark-sql

Getting error in classpath, but jars are added


I am getting an error while submitting a spark program. Below is the error

client token: N/A
         diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: Can not load class 'brickhouse.udf.collect.NumericRange' when registering the function 'numeric_range', please make sure it is on the classpath;

I am using below spark submit and referring all the jars being used, but still getting an error related to Brickhouse jar while creating a temporary function

/usr/hdp/2.6.5.3015-8/spark2/bin/spark-submit --master yarn --deploy-mode cluster --class "com.test.example.Sample" /home/sshuser/project1_2.11-0.1.0-SNAPSHOT.jar 
--jars /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar,/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar,/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar 
--driver-class-path /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar 
--executor-class-path /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar

Spark version is 2.3. I am using a temporary function to execute my code, below is my code snippet

package com.test.example;
import org.apache.spark.sql._
import com.databricks.spark.xml._;
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sample{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("XML Reading").setMaster("yarn-cluster").set("spark.driver.extraClassPath", "/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar").set("spark.executor.extraClassPath", "/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar")
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.sql("ADD JAR /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar")
spark.sql("""CREATE TEMPORARY FUNCTION numeric_range AS 'brickhouse.udf.collect.NumericRange' using JAR '/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar'""")
spark.sql("""CREATE TEMPORARY FUNCTION array_index AS 'brickhouse.udf.collect.ArrayIndexUDF' using JAR '/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar'""")

var df = spark.read.format("com.databricks.spark.xml").option("rowTag","DataArea").load("///scripts/Store_Site_edited.xml")
val schema = df.schema
var parse_df = spark.read.format("com.databricks.spark.xml").option("rowTag","DataArea").schema(schema).load("///scripts/Store_Site_edited.xml")

parse_df.registerTempTable("complete_df")

spark.sql(s"""select current_timestamp() as load_dttm,
 actioncode,cast(regexp_replace(substr(creationTime,1,19),"T"," ") as timestamp) as start_dttm,n as array_index,array_index(Location,n) as Location from complete_df lateral view numeric_range(size(Location)) n1 as n""").registerTempTable("main_table_1")

val writeTo_6=spark.sql("""select cast(location.id._value as String)  as siteid,
load_dttm,
location.id._schemeagencyid as site_schemeagencyid,
location.id._type as site_type,
location.name as site_name,
cast(location.coordinate.Latitude.DegreeMeasure._value as String)  as latitude_degreemeasure,
location.Coordinate.Latitude.DegreeMeasure._unitCode as latitude_degreemeasure_unitcode,
cast(location.coordinate.Longitude.DegreeMeasure._value  as String) as longitude_degreemeasure from main_table_1""").write.option("header","true").format("csv").mode("overwrite").save("///scripts/output6")
spark.stop
}
}

Not sure what I am missing, Any suggestions would be very helpful


Solution

  • The error could be due to unavailability of jars in the workers nodes.Please try pushing the jars in the /usr/hdp/2.6.5.3015-8/spark2/jars/ in all worker nodes.