I have a collection of JSON files containing Twitter data that I'd like to use as a datasource for structured streaming in Databricks/Spark. The JSON files have the following structure:
[{...tweet data...},{...tweet data...},{...tweet data...},...]
My PySpark code:
# Stream from the /tmp/tweets folder
tweetstore = "/tmp/tweets/"
# Set up the folder as a streaming source
streamingInputDF = (
spark \
.readStream \
.schema(json_schema) \
.json(tweetstore)
)
# Check
streamingInputDF.isStreaming
# Access the DF using SQL
streamingQuery = streamingInputDF \
.select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
.writeStream \
.format("memory") \
.queryName("tweetstream") \
.outputMode("append")\
.start()
streamingDF = spark.sql("select * from tweetstream order by 1 desc")
My output looks like this:
Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null |null|null|null |null |null |
|null |null|null|null |null |null |
|null |null|null|null |null |null |
From what I can tell, I probably need to use UDF
or explode()
to parse the JSON array properly but haven't quite figured out how so far.
Documenting the answer for others who might stumble upon this: I realised the JSON didn't have one object per line as Spark expects. The key then, was to add .option("multiline", True)
, i.e.:
streamingInputDF = (
spark \
.readStream \
.option("multiline", True) \
.schema(json_schema) \
.json(tweetstore)
)