I am using Spark2.3.0 and kafka1.0.0.3. I have created a spark read stream
df = spark.readStream. \
format("kafka"). \
option("kafka.bootstrap.servers", "localhost.cluster.com:6667"). \
option("subscribe", "test_topic"). \
load(). \
selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)")
It runs succesfully and then
df_write = df_read2 \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)") \
.writeStream \
.format("csv") \
.option("path", "/test_streaming_data") \
.option("checkpointLocation", "test_streaming_data/checkpoint") \
.start()
But when I run this
df_write.awaitTermination()
It throws me an error:
Py4JJavaError: An error occurred while calling o264.awaitTermination.
: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: [id = c140e21c-f827-4b1d-9182-b3f68a405fad, runId = 47d4b5cb-f223-4235-bef1-84871a2f85c8]
Current Committed Offsets: {}
Current Available Offsets: {KafkaSource[Subscribe[test_topic]]: {"test_topic":{"0":31300}}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [cast(key#21 as string) AS key#124, cast(value#22 as string) AS value#125, cast(timestamp#23 as timestamp) AS timestamp#126]
+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22, cast(timestamp#12 as timestamp) AS timestamp#23]
+- StreamingExecutionRelation KafkaSource[Subscribe[test_topic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 11, localhost.cluster.com, executor 2): java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition
Can anyone please help me sorting out the issue?
I have tried replacing the jar libraries with updated ones, but yet the issue persists.
tried replacing the jar libraries with updated ones
Unclear what you are doing with this, but you should never modify any JAR files directly.
Use --packages
option when you run the app. I.e. for latest Spark 2.3.x
, you need this package
spark-submit --master=local \
--packages='org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4' \
app.py
I have a Jupyter example here - https://github.com/OneCricketeer/docker-stacks/blob/master/hadoop-spark/spark-notebooks/kafka-sql.ipynb