Search code examples
pythondataframeparsingpyspark

How to Explode JSON Strings into Multiple Columns using PySpark


I have a dataframe with one column of string datatype. Each string line represents an API request that returns a json.

After create the DataFrame and parsing the data, I'm getting the following result:

def fn_process_data(data):

    spark = SparkSession.builder.master('local[*]')\
                                .appName('sparkdf')\
                                .config("spark.log.level", "OFF")\
                                .getOrCreate()

    ### ---------------------------------------------------------------------------

    df = spark.createDataFrame([[json.dumps(d)] for d in data])

Result:

+--------------------+                                                          
|                  _1|
+--------------------+
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
|{"abilities": [{"...|
+--------------------+

After got the result I'm trying to explode the rows in new columns, but I'm getting an empty DataFrame.

    schema = ArrayType(MapType(StringType(), StringType()))

    df = df.withColumn("_1", F.explode(F.from_json("_1", schema)))\
           .select("_1.abilities",
                   "_1.base_experience",
                   "_1.cries",
                   "_1.forms",
                   "_1.games_indices",
                   "_1.height",
                   "_1.held_items",
                   "_1.id",
                   "_1.is_default",
                   "_1.location_area_encounters",
                   "_1.moves",
                   "_1.name",
                   "_1.order",
                   "_1.past_abilities",
                   "_1.past_types",
                   "_1.species",
                   "_1.sprites",
                   "_1.stats",
                   "_1.types",
                   "_1.weight")

    df.show()

Result:

+---------+---------------+-----+-----+-------------+------+----------+---+----------+------------------------+-----+----+-----+--------------+----------+-------+-------+-----+-----+------+
|abilities|base_experience|cries|forms|games_indices|height|held_items| id|is_default|location_area_encounters|moves|name|order|past_abilities|past_types|species|sprites|stats|types|weight|
+---------+---------------+-----+-----+-------------+------+----------+---+----------+------------------------+-----+----+-----+--------------+----------+-------+-------+-----+-----+------+
+---------+---------------+-----+-----+-------------+------+----------+---+----------+------------------------+-----+----+-----+--------------+----------+-------+-------+-----+-----+------+

The result I'm expecting is to split the data into columns, like this:

+--------------------------------------------------------------------------------------+---------------+----------------------- [...]
|abilities                                                                             |base_experience|cries                           
+--------------------------------------------------------------------------------------+---------------+----------------------- [...]
|[{"ability":{"name": "limber", "url": "https://pokeapi.co/api/v2/ability/7/"}, [...]] |101            |{"latest": "https://raw.[...]
|[{"ability":{"name": "pikachu", "url": "https://pokeapi.co/api/v2/ability/3"}, [...]] |103            |{"latest": "https://raw.[...]

How can I achieve this result using functions like "MapType", "explode", etc?


Solution

  • With from_json() your schema should be an array of structs in this case, as the data is more complex than an array of string-string maps. The data is truncated in the original provided examples, but the schema would look something like so:

    from pyspark.sql import functions as F
    
    schema = """
    abilities ARRAY<
      STRUCT<name STRING, url STRING>
    >,
    base_experience INT,
    cries STRUCT<latest STRING, legacy STRING>,
    forms ARRAY<STRUCT<
      id INT,
      form_name STRING
    >>,
    games_indices ARRAY<STRUCT<
      game_index INT,
      generation STRUCT<name STRING, url STRING>
    >>,
    id INT,
    name STRING
    """
    
    df = df.select(
      F.from_json(F.col("_1"), schema)
    )
    

    Note: this example uses a DDL string syntax to define the schema for simplicity. The same effect can still be achieved with typed PySpark or Scala Spark code.

    If this is too verbose or you'd like to let the schema evolve more loosely, you can look at the schema_of_json() built-in function as well.