I am trying to read a kafka stream and save it to Hive as a table.
The consumer code is :
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.streaming.Trigger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
object testH {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("MyApp")
.master("local")
.enableHiveSupport()
.getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-group-id",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("my-topic")
val schema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("age", IntegerType)
// Read from Kafka topic
val stream: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
// Parse JSON data and write to Hive table
val query = stream
.select(from_json(col("value"), schema).as("data"))
.selectExpr("data.id", "data.name", "data.age")
.writeStream.
foreachBatch {(batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.format("hive")
.mode(SaveMode.Append)
.saveAsTable("test");
}.start()
query.awaitTermination()
}
}
my build.sbt :
ThisBuild / version := "0.1.0-SNAPSHOT"
name := "kafka-spark-hive"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % "3.3.2",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.3.2",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
"org.apache.spark" %% "spark-sql" % "3.3.2",
"org.apache.spark" %% "spark-streaming" % "3.3.2",
"org.apache.kafka" % "kafka-clients" % "3.4.0",
"com.typesafe" % "config" % "1.4.2",
"org.apache.hive" % "hive-exec" % "3.1.3" ,
"org.apache.hive" % "hive-metastore" % "3.1.3" ,
"org.apache.hive" % "hive-common" % "3.1.3" ,
"org.apache.hadoop" % "hadoop-common" % "3.3.2" ,
"org.apache.hadoop" % "hadoop-hdfs" % "3.3.2",
"org.apache.hadoop" % "hadoop-auth" % "3.3.2"
)
I get that error : java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z
I tried downgrading and upgrading some dependencies, but I don't know which one caused the issue.
See how you should investigate the issue with NoSuchMethodError
How do I fix a NoSuchMethodError?
https://reflectoring.io/nosuchmethod/
https://www.baeldung.com/java-nosuchmethod-error
https://www.javatpoint.com/java-lang-nosuchmethoderror
https://www.geeksforgeeks.org/how-to-solve-java-lang-nosuchmethoderror-in-java/
The method
org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z
is from hive-common
.
This is the output of sbt dependencyTree
$ sbt dependencyTree > output.txt
https://github.com/DmytroMitin/SO-Q75880410-kafka-spark-hive-demo
https://raw.githubusercontent.com/DmytroMitin/SO-Q75880410-kafka-spark-hive-demo/main/output.txt (29 MB)
[info] kafka-spark-hive:kafka-spark-hive_2.12:1.0 [S]
[info] +-...
[info] +-org.apache.spark:spark-hive_2.12:3.3.2
[info] | +-...
[info] | |
[info] | +-org.apache.hive:hive-common:2.3.9 (evicted by: 3.1.3)
[info] | +-org.apache.hive:hive-common:3.1.3
In hive-common
3.1.3 the signature of mkdir
is
public static boolean mkdir(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path f,
org.apache.hadoop.conf.Configuration conf)
throws IOException
but in hive-common
2.3.9 the signature was
public static boolean mkdir(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path f,
boolean inheritPerms,
org.apache.hadoop.conf.Configuration conf)
throws IOException
i.e. with an extra parameter.
So you seem to have a conflict between org.apache.hive:hive-common:3.1.3
and org.apache.spark:spark-hive_2.12:3.3.2
. spark-hive
3.3.2 expects hive-common
2.3.9, not 3.1.3
https://repo1.maven.org/maven2/org/apache/spark/spark-hive_2.12/3.3.2/spark-hive_2.12-3.3.2.pom
The signature of mkDir
changed in hive-common
3.0.0
So since you're using Spark 3.3.2, try to downgrade
"org.apache.hive" % "hive-exec" % "3.1.3" ,
"org.apache.hive" % "hive-metastore" % "3.1.3" ,
"org.apache.hive" % "hive-common" % "3.1.3"
to
"org.apache.hive" % "hive-exec" % "2.3.9" ,
"org.apache.hive" % "hive-metastore" % "2.3.9" ,
"org.apache.hive" % "hive-common" % "2.3.9"
and check whether there will be no other conflicts.
Run a scala code jar appear NoSuchMethodError:scala.Predef$.refArrayOps