Search code examples
jsonapache-sparktwitterpysparkdatabricks

How do you parse an array of JSON objects into a Spark Dataframe?


I have a collection of JSON files containing Twitter data that I'd like to use as a datasource for structured streaming in Databricks/Spark. The JSON files have the following structure:

[{...tweet data...},{...tweet data...},{...tweet data...},...]

My PySpark code:

# Stream from the /tmp/tweets folder
tweetstore = "/tmp/tweets/"

# Set up the folder as a streaming source
streamingInputDF = (
  spark \
    .readStream \
    .schema(json_schema) \
    .json(tweetstore)
)

# Check
streamingInputDF.isStreaming

# Access the DF using SQL
streamingQuery = streamingInputDF \
  .select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
  .writeStream \
  .format("memory") \
  .queryName("tweetstream") \
  .outputMode("append")\
  .start()

streamingDF = spark.sql("select * from tweetstream order by 1 desc")

My output looks like this:

Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id  |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |

From what I can tell, I probably need to use UDF or explode() to parse the JSON array properly but haven't quite figured out how so far.


Solution

  • Documenting the answer for others who might stumble upon this: I realised the JSON didn't have one object per line as Spark expects. The key then, was to add .option("multiline", True), i.e.:

    streamingInputDF = (
      spark \
        .readStream \
        .option("multiline", True) \
        .schema(json_schema) \
        .json(tweetstore)
    )