Im trying to read a csv and get event state using mapGroupsWithState and writing it to kafka. Below code works if I coment out mapGroupsWithState peice. using spark version 2.3.1
val event = spark.read.option("header","true").csv(path)
val eventSession = imsi.orderBy("event_timestamp")
.groupByKey(_.key)
.mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateAcrossEvents)
eventSession.toJSON.write.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", outputTopic).save
error
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 85 in stage 11.0 failed 8 times, most recent failure: Lost task 85.7 in stage 11.0 (TID 53, XXX, executor 2): java.lang.NoClassDefFoundError: Could not initialize class xxxx$
at xxx.imsiProcessor$$anonfun$run$1$$anonfun$3.apply(xx.scala:86)
at xxx.imsiProcessor$$anonfun$run$1$$anonfun$3.apply(xx.scala:86)
at org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$3.apply(KeyValueGroupedDataset.scala:279)
at org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$3.apply(KeyValueGroupedDataset.scala:279)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$12.apply(objects.scala:361)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$12.apply(objects.scala:360)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$10$$anonfun$apply$4.apply(objects.scala:337)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$10$$anonfun$apply$4.apply(objects.scala:336)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:367)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2493)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:933)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:924)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:924)
at com.telstra.elbrus.core.imsiProcessor$.spark$lzycompute(ImsiProcessor.scala:38)
I was able to get the code running by getting rid of few extends. bare bone code started running.