Search code examples
pythonapache-sparkpysparkapache-kafka

Pyspark Structured Streaming - error related to allowAutoTopicCreation


I'm trying to do some very basic stream processing using PySpark (3.2.4) Structured Streaming, using Kafka as my data source. Just to get up and running, I'm attempting the really basic task of parsing a field changeType from my source messages and appending it out to the console. However, when I run my script I get an pyspark.errors.exceptions.captured.StreamingQueryException. See below for script and traceback:

pyspark_structured_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType

spark = SparkSession \
    .builder \
    .appName("PysparkTesting") \
    .getOrCreate()

spark.sparkContext.setLogLevel('WARN')

schema = StructType().add("changeType", StringType())

stream_data = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    .option("subscribe", "topic") \
    .load()


parsed_data = stream_data.selectExpr("CAST(value as STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.changeType")

query = parsed_data.writeStream \
          .outputMode("append") \
          .format("console") \
          .start()

query.awaitTermination()

I run the script using the command

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 pyspark_structured_streaming.py

which should run fine but gives the error:

Traceback
Traceback (most recent call last):
  File "/Users/johnf/my_project/pyspark/pyspark_structured_streaming.py", line 31, in <module>
    query.awaitTermination()
  File "/Users/johnf/.conda/envs/my_project/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination
  File "/Users/johnf/.conda/envs/my_project/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/Users/johnf/.conda/envs/my_project/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 290ef0e0-dca4-4a2a-b767-bd171210c2e4, runId = 61f00034-c12e-42a9-b55e-0ee133f8211a] terminated with exception: org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest versions older than 4 don't support the allowAutoTopicCreation field

My first thought would be that this suggests the topic doesn't exist, hence an error about allowAutoTopicCreation. However, the topic definitely does exist and I can consume messages from it using KafkaConsumer from kafka-python.

In the same environment as Spark I also have kafka-python version 2.0.2 installed. The Kafka brokers I am trying to access are on remote servers in my company, Kafka version 2.6.0.3-8.


Solution

  • using PySpark (3.2.4)

    Then you need to use version 3.2.4 of your kafka library

    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4