Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streaming

How to display a streaming DataFrame (as show fails with AnalysisException)?


So I have some data I'm stream in a Kafka topic, I'm taking this streaming data and placing it into a DataFrame. I want to display the data inside of the DataFrame:

import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'

topic_name = "my-topic"
kafka_broker = "localhost:9092"

producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)

while datetime.now() < terminate:
    producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
    time.sleep(1)

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

readDF.writeStream.format("console").start()
readDF.show()

producer.close()

However I keep on getting this error:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
      File "test2.py", line 30, in <module>
        readDF.show()
      File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
        print(self._jdf.showString(n, 20))
      File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
        raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

I don't understand why the exception is happening, I'm calling writeStream.start() right before show(). I tried getting rid of selectExpr() but that made no difference. Does anyone know how to display a stream sourced DataFrame? I'm using Python 3.6.1, Kafka 0.10.2.1, and Spark 2.2.0


Solution

  • Streaming DataFrame doesn't support the show() method. When you call start() method, it will start a background thread to stream the input data to the sink, and since you are using ConsoleSink, it will output the data to the console. You don't need to call show().

    Remove readDF.show() and add a sleep after that, then you should be able to see data in the console, such as

    query = readDF.writeStream.format("console").start()
    import time
    time.sleep(10) # sleep 10 seconds
    query.stop()
    

    You also need to set startingOffsets to earliest, otherwise, Kafka source will just start from the latest offset and fetch nothing in your case.

    readDF = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_broker) \
        .option("startingOffsets", "earliest") \
        .option("subscribe", topic_name) \
        .load()