I have kafka_2.13-2.7.0 in Ubuntu 20.04. I run kafka server and zookeeper then create a topic and send a text file in it via nc -lk 9999
. The topic is full of data. Also, I have spark-3.0.1-bin-hadoop2.7 on my system. In fact, I want to use the kafka topic as a source for Spark Structured Streaming with python. My code is like this:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
I run the above code via spark-submit with this command:
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py
The code run without any exception and I receive this output as it is in Spark site:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
My question is that the kafka topic is full of data;but there is no any data as a result of running the code in output. Would you please guide me what is wrong here?
The code as is will not print out any data but only provide you the schema once.
You can follow the instructions given in the general Structured Streaming Guide and the Structured Streaming + Kafka integration Guide to see how to print out data to the console. Remember that reading data in Spark is a lazy operation and nothing is done without an action (typically a writeStream
operation).
If you complement the code as below you should see the selected data (key and value) printed out to the console:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark\
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.start()
query.awaitTermination()