I am trying to create a POC on spark structured streaming with Kafka using python, below is the code.
Spark version - 2.3.2 kafka - 2.11-2.1.0 Hadoop - 2.8.3
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
brokers, topic = sys.argv[1:]
print("broker : {} and Topic : {}".format(brokers,topic))
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.load()
numbericdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
numbericdf.createOrReplaceTempView("updates")
average = spark.sql("select value from updates")
print(average)
query = average \
.writeStream \
.outputMode("append") \
.format("console")\
.start()
query.awaitTermination()
on spark-submit getting below error.
.\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 spark_struct.py localhost:9092 tempre
19/01/07 13:34:55 ERROR MicroBatchExecution: Query [id = 03fdd202-d795-4f69-ad8d-e712568e3d88, runId = 03dd8e92-ffee-414d-99a1-f43a819630dd] terminated with error
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
This issue is solved after moving from java 11 to java 8.