Search code examples
apache-sparkspark-streaming

mapGroupsWithState throwing error Caused by: java.lang.NoClassDefFoundError: Could not initialize


Im trying to read a csv and get event state using mapGroupsWithState and writing it to kafka. Below code works if I coment out mapGroupsWithState peice. using spark version 2.3.1

val event = spark.read.option("header","true").csv(path)
val eventSession = imsi.orderBy("event_timestamp")
                   .groupByKey(_.key)
                   .mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateAcrossEvents)
eventSession.toJSON.write.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("topic", outputTopic).save 

error

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 85 in stage 11.0 failed 8 times, most recent failure: Lost task 85.7 in stage 11.0 (TID 53, XXX, executor 2): java.lang.NoClassDefFoundError: Could not initialize class xxxx$
at xxx.imsiProcessor$$anonfun$run$1$$anonfun$3.apply(xx.scala:86)
at xxx.imsiProcessor$$anonfun$run$1$$anonfun$3.apply(xx.scala:86)
at org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$3.apply(KeyValueGroupedDataset.scala:279)
at org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$3.apply(KeyValueGroupedDataset.scala:279)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$12.apply(objects.scala:361)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$12.apply(objects.scala:360)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$10$$anonfun$apply$4.apply(objects.scala:337)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$10$$anonfun$apply$4.apply(objects.scala:336)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:367)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2493)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:933)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:924)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:924)
    at com.telstra.elbrus.core.imsiProcessor$.spark$lzycompute(ImsiProcessor.scala:38)

Solution

  • I was able to get the code running by getting rid of few extends. bare bone code started running.