Search code examples
scalahadoopapache-kafkahivespark-streaming

java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir while trying to save a table to Hive


I am trying to read a kafka stream and save it to Hive as a table.

The consumer code is :

import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.streaming.Trigger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger.ProcessingTime


object testH {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("MyApp")
      .master("local")
      .enableHiveSupport()
      .getOrCreate()

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "my-group-id",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("my-topic")

    val schema = new StructType()
      .add("id", IntegerType)
      .add("name", StringType)
      .add("age", IntegerType)

    // Read from Kafka topic
    val stream: DataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "my-topic")
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(value AS STRING)")

    // Parse JSON data and write to Hive table
 
  val query = stream
    .select(from_json(col("value"), schema).as("data"))
    .selectExpr("data.id", "data.name", "data.age")
    .writeStream.
    foreachBatch {(batchDF: DataFrame, batchId: Long) =>
    batchDF
      .write
      .format("hive")
      .mode(SaveMode.Append)
      .saveAsTable("test");
  }.start()

    query.awaitTermination()
  }
}

my build.sbt :

ThisBuild / version := "0.1.0-SNAPSHOT"

name := "kafka-spark-hive"

version := "1.0"

scalaVersion := "2.12.15"

libraryDependencies ++= Seq(

  "org.apache.spark" %% "spark-hive" % "3.3.2",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.3.2",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
  "org.apache.spark" %% "spark-sql" % "3.3.2",
  "org.apache.spark" %% "spark-streaming" % "3.3.2",


  "org.apache.kafka" % "kafka-clients" % "3.4.0",
  "com.typesafe" % "config" % "1.4.2",
  "org.apache.hive" % "hive-exec" % "3.1.3" ,
  "org.apache.hive" % "hive-metastore" % "3.1.3" ,
  "org.apache.hive" % "hive-common" % "3.1.3" ,
  
  "org.apache.hadoop" % "hadoop-common" % "3.3.2" ,
  "org.apache.hadoop" % "hadoop-hdfs" % "3.3.2",
  "org.apache.hadoop" % "hadoop-auth" % "3.3.2"
)



I get that error : java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z

I tried downgrading and upgrading some dependencies, but I don't know which one caused the issue.


Solution

  • See how you should investigate the issue with NoSuchMethodError

    How do I fix a NoSuchMethodError?

    https://reflectoring.io/nosuchmethod/

    https://www.baeldung.com/java-nosuchmethod-error

    https://www.javatpoint.com/java-lang-nosuchmethoderror

    https://www.geeksforgeeks.org/how-to-solve-java-lang-nosuchmethoderror-in-java/

    The method

    org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z

    is from hive-common.

    This is the output of sbt dependencyTree

    $ sbt dependencyTree > output.txt
    

    https://github.com/DmytroMitin/SO-Q75880410-kafka-spark-hive-demo

    https://raw.githubusercontent.com/DmytroMitin/SO-Q75880410-kafka-spark-hive-demo/main/output.txt (29 MB)

    [info] kafka-spark-hive:kafka-spark-hive_2.12:1.0 [S]
    [info]   +-...
    [info]   +-org.apache.spark:spark-hive_2.12:3.3.2
    [info]   | +-...
    [info]   | | 
    [info]   | +-org.apache.hive:hive-common:2.3.9 (evicted by: 3.1.3)
    [info]   | +-org.apache.hive:hive-common:3.1.3
    

    In hive-common 3.1.3 the signature of mkdir is

    https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.1.3/api/org/apache/hadoop/hive/common/FileUtils.html#mkdir-org.apache.hadoop.fs.FileSystem-org.apache.hadoop.fs.Path-org.apache.hadoop.conf.Configuration-

    public static boolean mkdir(org.apache.hadoop.fs.FileSystem fs,
                                org.apache.hadoop.fs.Path f,
                                org.apache.hadoop.conf.Configuration conf)
                         throws IOException
    

    but in hive-common 2.3.9 the signature was

    https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r2.3.9/api/org/apache/hadoop/hive/common/FileUtils.html#mkdir-org.apache.hadoop.fs.FileSystem-org.apache.hadoop.fs.Path-boolean-org.apache.hadoop.conf.Configuration-

    public static boolean mkdir(org.apache.hadoop.fs.FileSystem fs,
                                org.apache.hadoop.fs.Path f,
                                boolean inheritPerms,
                                org.apache.hadoop.conf.Configuration conf)
                         throws IOException
    

    i.e. with an extra parameter.

    So you seem to have a conflict between org.apache.hive:hive-common:3.1.3 and org.apache.spark:spark-hive_2.12:3.3.2. spark-hive 3.3.2 expects hive-common 2.3.9, not 3.1.3

    https://repo1.maven.org/maven2/org/apache/spark/spark-hive_2.12/3.3.2/spark-hive_2.12-3.3.2.pom

    The signature of mkDir changed in hive-common 3.0.0

    https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.0.0/api/org/apache/hadoop/hive/common/FileUtils.html#mkdir-org.apache.hadoop.fs.FileSystem-org.apache.hadoop.fs.Path-org.apache.hadoop.conf.Configuration-

    So since you're using Spark 3.3.2, try to downgrade

    "org.apache.hive" % "hive-exec"      % "3.1.3" ,
    "org.apache.hive" % "hive-metastore" % "3.1.3" ,
    "org.apache.hive" % "hive-common"    % "3.1.3"
    

    to

    "org.apache.hive" % "hive-exec"      % "2.3.9" ,
    "org.apache.hive" % "hive-metastore" % "2.3.9" ,
    "org.apache.hive" % "hive-common"    % "2.3.9"
    

    and check whether there will be no other conflicts.

    error: java.lang.NoSuchMethodError: 'scala.tools.nsc.reporters.Reporter scala.tools.nsc.Global.reporter()' in scoverage-maven-plugin

    Kafka start error on MAC .. something related to java and scala ... NoSuchMethodError: scala.Predef$.refArrayOps

    Run a scala code jar appear NoSuchMethodError:scala.Predef$.refArrayOps