I'm trying to do some very basic stream processing using PySpark (3.2.4) Structured Streaming, using Kafka as my data source. Just to get up and running, I'm attempting the really basic task of parsing a field changeType
from my source messages and appending it out to the console. However, when I run my script I get an pyspark.errors.exceptions.captured.StreamingQueryException
. See below for script and traceback:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType
spark = SparkSession \
.builder \
.appName("PysparkTesting") \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType().add("changeType", StringType())
stream_data = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic") \
.load()
parsed_data = stream_data.selectExpr("CAST(value as STRING)") \
.select(from_json("value", schema).alias("data")) \
.select("data.changeType")
query = parsed_data.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
I run the script using the command
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 pyspark_structured_streaming.py
which should run fine but gives the error:
Traceback (most recent call last):
File "/Users/johnf/my_project/pyspark/pyspark_structured_streaming.py", line 31, in <module>
query.awaitTermination()
File "/Users/johnf/.conda/envs/my_project/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination
File "/Users/johnf/.conda/envs/my_project/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/Users/johnf/.conda/envs/my_project/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 290ef0e0-dca4-4a2a-b767-bd171210c2e4, runId = 61f00034-c12e-42a9-b55e-0ee133f8211a] terminated with exception: org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest versions older than 4 don't support the allowAutoTopicCreation field
My first thought would be that this suggests the topic doesn't exist, hence an error about allowAutoTopicCreation
. However, the topic definitely does exist and I can consume messages from it using KafkaConsumer
from kafka-python
.
In the same environment as Spark I also have kafka-python version 2.0.2 installed. The Kafka brokers I am trying to access are on remote servers in my company, Kafka version 2.6.0.3-8.
using PySpark (3.2.4)
Then you need to use version 3.2.4 of your kafka library
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4