Search code examples
apache-sparkapache-kafkaspark-streaming

getting error in spark structured streaming


I am trying to create a POC on spark structured streaming with Kafka using python, below is the code.

Spark version - 2.3.2 kafka - 2.11-2.1.0 Hadoop - 2.8.3

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

brokers, topic = sys.argv[1:]
print("broker : {} and Topic : {}".format(brokers,topic))    

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", brokers) \
  .option("subscribe", topic) \
  .load()

numbericdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
numbericdf.createOrReplaceTempView("updates")
average  = spark.sql("select value from updates")
print(average)

query = average \
    .writeStream \
    .outputMode("append") \
    .format("console")\
    .start()

query.awaitTermination()

on spark-submit getting below error.

.\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 spark_struct.py localhost:9092 tempre

19/01/07 13:34:55 ERROR MicroBatchExecution: Query [id = 03fdd202-d795-4f69-ad8d-e712568e3d88, runId = 03dd8e92-ffee-414d-99a1-f43a819630dd] terminated with error
java.lang.IllegalArgumentException
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
        at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
        at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
        at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
        at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
        at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

Solution

  • This issue is solved after moving from java 11 to java 8.