Search code examples
jsoncsvpysparkapache-spark-sql

Json flattening in PySpark with multiple array fields


I am looking for a problem statement where i need to Flatten Json data containing arrays (list of dictionaries). While flattening one of the array fields (version)will become column's parameter. I have a dataframe created from Json which looks like this:

-id (string)
-as_of_date (string)
-past_features (array) (version values can take values 3months/6months/9months )
  -height (string)
  -width (string)
  -screensize (string)
  -smartdevice (boolean)
  -version(string)
-future_features (array)(version values can take values 3months/6months/9months)
  -height (string)
  -width (string)
  -screensize (string)
  -smartdevice (boolean)
  -version(string)

How to flatten this to below schema such that i create 1 record per json record only but creating several columns like below. Note the version field is not a column ib below schema , but instead embedded in column names and each versions data is flattened together in a single row.Final flattened schema looks like below:

-id (string)
-as_of_date (string)
-past_features_3m_height (string)
-past_features_3m_width (string)
-past_features_3m_screensize (string)
-past_features_3m_smartdevice (boolean)
-past_features_6m_height (string)
-past_features_6m_width (string)
-past_features_6m_screensize (string)
-past_features_6m_smartdevice (boolean)
-past_features_9m_height (string)
-past_features_9m_width (string)
-past_features_9m_screensize (string)
-past_features_9m_smartdevice (boolean)
-future_features_3m_height (string)
-future_features_3m_width (string)
-future_features_3m_screensize (string)
-future_features_3m_smartdevice (boolean)
-future_features_6m_height (string)
-future_features_6m_width (string)
-future_features_6m_screensize (string)
-future_features_6m_smartdevice (boolean)
-future_features_9m_height (string)
-future_features_9m_width (string)
-future_features_9m_screensize (string)
-future_features_9m_smartdevice (boolean)

Solution

  • You can flatten it using explode and pivot.

    This is a simplified jsonl example created with json schema faker just to try the solution:

    { "id": "a", "as_of_date": "yes", "past_features": [ { "width": -66105819, "version": "6m", "height": -70464769 }, { "width": 12820987, "version": "6m", "height": 18639815 }, { "width": -23098401, "version": "9m", "height": 77217612 } ] }
    { "id": "c", "as_of_date": "no", "past_features": [ { "width": -4552888, "version": "3m", "height": 98265428 } ] }
    { "id": "b", "as_of_date": "yes", "past_features": [ { "width": 35952886, "version": "3m", "height": -85501281 }, { "width": 35068589, "version": "6m" }, { "width": 97965007, "version": "9m", "height": -59246801 } ] }
    

    First, read the data:

    >>> df = spark.read.json('data.jsonl')
    

    This is the schema we have (I've removed future_features just to keep the example simple and clean):

    >>> df.printSchema()
    root
     |-- as_of_date: string (nullable = true)
     |-- id: string (nullable = true)
     |-- past_features: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- height: long (nullable = true)
     |    |    |-- version: string (nullable = true)
     |    |    |-- width: long (nullable = true)
    

    Explode to get one row per item on past_features array:

    >>> df_exploded = df.withColumn('past_features',  F.explode('past_features'))
    >>> df_exploded.show()
    +----------+---+--------------------+
    |as_of_date| id|       past_features|
    +----------+---+--------------------+
    |       yes|  a|{-70464769, 6m, -...|
    |       yes|  a|{18639815, 6m, 12...|
    |       yes|  a|{77217612, 9m, -2...|
    |        no|  c|{98265428, 3m, -4...|
    |       yes|  b|{-85501281, 3m, 3...|
    |       yes|  b|{null, 6m, 35068589}|
    |       yes|  b|{-59246801, 9m, 9...|
    +----------+---+--------------------+
    

    Once we have one row per feature, we group by id and pivot over version to produce one different column per version value (i.e, three columns, one for "3m", other for "6m" and other for "9m"):

    >>> fields = ['height', 'width']
    >>> versions = ['3m', '6m', '9m']
    >>> # group by at_of_date to keep it after grouping (probably there is a more efficient and elegant solution)
    >>> df_pivoted = df_exploded.groupBy('id', 'as_of_date')\
            .pivot('past_features.version', versions)\
            .agg(F.first('past_features'))
    >>> df_pivoted.show()
    +---+----------+--------------------+--------------------+--------------------+
    | id|as_of_date|                  3m|                  6m|                  9m|
    +---+----------+--------------------+--------------------+--------------------+
    |  a|       yes|                null|{-70464769, 6m, -...|{77217612, 9m, -2...|
    |  b|       yes|{-85501281, 3m, 3...|{null, 6m, 35068589}|{-59246801, 9m, 9...|
    |  c|        no|{98265428, 3m, -4...|                null|                null|
    +---+----------+--------------------+--------------------+--------------------+
    

    Lastly use select to get the feature fields:

    >>> df_flattened = df_pivoted.select('id', 'as_of_date',
            *[F.col(v + '.' + f).alias(v + '_' + f) for v in versions for f in fields])
    >>> df_flattened.show()
    +---+----------+---------+--------+---------+---------+---------+---------+
    | id|as_of_date|3m_height|3m_width|6m_height| 6m_width|9m_height| 9m_width|
    +---+----------+---------+--------+---------+---------+---------+---------+
    |  a|       yes|     null|    null|-70464769|-66105819| 77217612|-23098401|
    |  b|       yes|-85501281|35952886|     null| 35068589|-59246801| 97965007|
    |  c|        no| 98265428|-4552888|     null|     null|     null|     null|
    +---+----------+---------+--------+---------+---------+---------+---------+
    

    And that's it! Adding the fields that I have ignored for the sake of simplicity should be pretty straightforward.