Search code examples
pythonscalaapache-sparkpysparkjohnsnowlabs-spark-nlp

Spark Python Pyspark How to flatten a column with an array of dictionaries and embedded dictionaries (sparknlp annotator output)


I'm trying to extract the output from the sparknlp (using Pretrained Pipeline 'explain_document_dl'). I have spent a lot of time looking for ways (UDFs, explode, etc) but cannot get anywhere close to a workable solution. Say I want to get extract values under result and metadata from the column entities. In that column there is an array containing multiple dictionaries

When I use df.withColumn("entity_name", explode("entities.result")), only the value from the first dictionary is extracted.

The content of the "entities" column is a list of dictionaries.

The attempt to provide a reproducible example/re-create the dataframe (thanks to suggestion provided by @jonathan below):

# content of one cell as an example:
d = [{"annotatorType":"chunk","begin":2740,"end":2747,"result":"•Ability","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}, {"annotatorType":"chunk","begin":2740,"end":2747,"result":"Fedex","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}]

from pyspark.sql.types import StructType, StructField, StringType
from array import array
schema = StructType([StructField('annotatorType', StringType(), True),
                     StructField('begin', IntegerType(), True),
                     StructField('end', IntegerType(), True),
                     StructField('result', StringType(), True),
                     StructField('sentence', StringType(), True),
                     StructField('chunk', StringType(), True),
                     StructField('metadata', StructType((StructField('entity', StringType(), True),
                                                      StructField('sentence', StringType(), True),
                                                      StructField('chunk', StringType(), True)
                                                      )), True),
                     StructField('embeddings', StringType(), True),
                     StructField('sentence_embeddings', StringType(), True)
                    ]
                   )

df = spark.createDataFrame(d, schema=schema)
df.show()

In this case of a single list of dictionary, it works:

+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|  result|sentence|chunk|    metadata|embeddings|sentence_embeddings|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|        chunk| 2740|2747|•Ability|    null| null|[ORG, 8, 22]|        []|                 []|
|        chunk| 2740|2747|   Fedex|    null| null|[ORG, 8, 22]|        []|                 []|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+

But I am stuck on how to apply this to a column, which contains some cells with an array of multiple dictionaries (so multiple rows to the original cell).

I tried to apply the same schema to the entities column, and I had to convert the column to json first.

ent1 = ent1.withColumn("entities2", to_json("entities"))

enter image description here

It works for cells with an array of 1 dictionary, but gives null to cells that have an array of multiple dictionaries (the 4th row):

ent1.withColumn("entities2", from_json("entities2", schema)).select("entities2.*").show()

+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|result|sentence|chunk|    metadata|embeddings|sentence_embeddings|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|        chunk|  166| 169|  Lyft|    null| null|[MISC, 0, 0]|        []|                 []|
|        chunk|   11|  14|  Lyft|    null| null|[MISC, 0, 0]|        []|                 []|
|        chunk|   52|  55|  Lyft|    null| null|[MISC, 1, 0]|        []|                 []|
|         null| null|null|  null|    null| null|        null|      null|               null|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+

The desired output is

+-------------+-----+----+----------------+------------------------+----------+-------------------+
|annotatorType|begin| end|         result |    metadata            |embeddings|sentence_embeddings|
+-------------+-----+----+----------------+------------------------+----------+-------------------+
|        chunk|  166| 169|Lyft            |[MISC]                  |        []|                 []|
|        chunk|   11|  14|Lyft            |[MISC]                  |        []|                 []|
|        chunk|   52|  55|Lyft.           |[MISC]                  |        []|                 []|
|        chunk| [..]|[..]|[Lyft,Lyft,     |[MISC,MISC,MISC,        |        []|                 []| 
|             |     |    |FedEx Ground..] |ORG,LOC,ORG,ORG,ORG,ORG]|          |                   |     
+-------------+-----+----+----------------+------------------------+----------+-------------------+

I also tried convert to json for each row, but I lose track of the original row and was given flatted son:

new_df = sqlContext.read.json(ent2.rdd.map(lambda r: r.entities2))
new_df.show()
+-------------+-----+----------+----+------------+----------------+-------------------+
|annotatorType|begin|embeddings| end|    metadata|          result|sentence_embeddings|
+-------------+-----+----------+----+------------+----------------+-------------------+
|        chunk|  166|        []| 169|[0, MISC, 0]|            Lyft|                 []|
|        chunk|   11|        []|  14|[0, MISC, 0]|            Lyft|                 []|
|        chunk|   52|        []|  55|[0, MISC, 1]|            Lyft|                 []|
|        chunk|    0|        []|  11| [0, ORG, 0]|    FedEx Ground|                 []|
|        chunk|  717|        []| 720| [1, LOC, 4]|            Dock|                 []|
|        chunk|  811|        []| 816| [2, ORG, 5]|          Parcel|                 []|
|        chunk| 1080|        []|1095| [3, ORG, 6]|Parcel Assistant|                 []|
|        chunk| 1102|        []|1108| [4, ORG, 7]|         • Daily|                 []|
|        chunk| 1408|        []|1417| [5, ORG, 8]|      Assistants|                 []|
+-------------+-----+----------+----+------------+----------------+-------------------+

I tried to apply UDF to go through the list of array inside "entities" :

def flatten(my_dict):
    d_result = defaultdict(list)
    for sub in my_dict:
        val = sub['result']
        d_result["result"].append(val)
    return d_result["result"]
ent = ent.withColumn('result', flatten(df.entities))

TypeError: Column is not iterable

I found this post Apache Spark Read JSON With Extra Columns being closely resembling my problem, but after converting the column entities to json, I still can't solve it by the solutions provided in that post.

Any help is appreciated!! Ideally solutions in python but examples in scala is helpful too!


Solution

  • The reason of getting null is because the schema variable doesn't exactly represent the list of dictionaries you are passing in as data

        from pyspark.shell import *
        from pyspark.sql.types import *
    
        schema = StructType([StructField('result', StringType(), True),
                     StructField('metadata', StructType((StructField('entity', StringType(), True),
                                                         StructField('sentence', StringType(), True),
                                                         StructField('chunk', StringType(), True))), True)])
    
        df = spark.createDataFrame(d1, schema=schema)
        df.show()
    

    If you prefer a customised solution, you can try pure python/pandas approach

        import pandas as pd
        from pyspark.shell import *
    
        result = []
        metadata_entity = []
        for row in d1:
            result.append(row.get('result'))
            metadata_entity.append(row.get('metadata').get('entity'))
    
        schema = {'result': [result], 'metadata.entity': [metadata_entity]}
        pandas_df = pd.DataFrame(schema)
    
        df = spark.createDataFrame(pandas_df)
        df.show()
    
        # specific columns
        df.select('result','metadata.entity').show()
    

    EDIT

    IMHO after reading all approaches you have been trying, I think sc.parallelize still makes the trick for pretty complex cases. I don't have your original variable, but I can OCR your image and take it from there --although there is no longer Classroom Teacher or Instructional values. Hopefully it will for all that be useful.

    You can always create a mock dataframe with the structure you need and get its schema

    For complex cases with nested data types, you may use SparkContext and read the resulting JSON format

        import itertools
    
        from pyspark.shell import *
        from pyspark.sql.functions import *
        from pyspark.sql.types import *
    
        # assume two lists in two dictionary keys to make four cells
        # since I don't have but entities2, I can just replicate it
        sample = {
            'single_list': [{'annotatorType': 'chunk', 'begin': '166', 'end': '169', 'result': 'Lyft',
                             'metadata': {'entity': 'MISC', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
                             'sentence_embeddings': []},
                            {'annotatorType': 'chunk', 'begin': '11', 'end': '14', 'result': 'Lyft',
                             'metadata': {'entity': 'MISC', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
                             'sentence_embeddings': []},
                            {'annotatorType': 'chunk', 'begin': '52', 'end': '55', 'result': 'Lyft',
                             'metadata': {'entity': 'MISC', 'sentence': '1', 'chunk': '0'}, 'embeddings': [],
                             'sentence_embeddings': []}],
            'frankenstein': [
                {'annotatorType': 'chunk', 'begin': '0', 'end': '11', 'result': 'FedEx Ground',
                 'metadata': {'entity': 'ORG', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
                 'sentence_embeddings': []},
                {'annotatorType': 'chunk', 'begin': '717', 'end': '720', 'result': 'Dock',
                 'metadata': {'entity': 'LOC', 'sentence': '4', 'chunk': '1'}, 'embeddings': [],
                 'sentence_embeddings': []},
                {'annotatorType': 'chunk', 'begin': '811', 'end': '816', 'result': 'Parcel',
                 'metadata': {'entity': 'ORG', 'sentence': '5', 'chunk': '2'}, 'embeddings': [],
                 'sentence_embeddings': []},
                {'annotatorType': 'chunk', 'begin': '1080', 'end': '1095', 'result': 'Parcel Assistant',
                 'metadata': {'entity': 'ORG', 'sentence': '6', 'chunk': '3'}, 'embeddings': [],
                 'sentence_embeddings': []},
                {'annotatorType': 'chunk', 'begin': '1102', 'end': '1108', 'result': '* Daily',
                 'metadata': {'entity': 'ORG', 'sentence': '7', 'chunk': '4'}, 'embeddings': [],
                 'sentence_embeddings': []},
                {'annotatorType': 'chunk', 'begin': '1408', 'end': '1417', 'result': 'Assistants',
                 'metadata': {'entity': 'ORG', 'sentence': '8', 'chunk': '5'}, 'embeddings': [],
                 'sentence_embeddings': []}]
        }
    
        # since they are structurally different, get two dataframes
        df_single_list = spark.read.json(sc.parallelize(sample.get('single_list')))
        df_frankenstein = spark.read.json(sc.parallelize(sample.get('frankenstein')))
    
        # print better the table first border
        print('\n')
    
        # list to create a dataframe schema
        annotatorType = []
        begin = []
        embeddings = []
        end = []
        metadata = []
        result = []
        sentence_embeddings = []
    
        # PEP8 here to have an UDF instead of lambdas
        # probably a dictionary with actions to avoid IF statements
        function_metadata = lambda x: [x.entity]
        for k, i in enumerate(df_frankenstein.columns):
            if i == 'annotatorType':
                annotatorType.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
            if i == 'begin':
                begin.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
            if i == 'embeddings':
                embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
            if i == 'end':
                end.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
            if i == 'metadata':
                _temp = list(map(function_metadata, df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect()))
                metadata.append(list(itertools.chain.from_iterable(_temp)))
            if i == 'result':
                result.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
            if i == 'sentence_embeddings':
                sentence_embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
    
        # headers
        annotatorType_header = 'annotatorType'
        begin_header = 'begin'
        embeddings_header = 'embeddings'
        end_header = 'end'
        metadata_header = 'metadata'
        result_header = 'result'
        sentence_embeddings_header = 'sentence_embeddings'
        metadata_entity_header = 'metadata.entity'
    
        frankenstein_schema = StructType(
            [StructField(annotatorType_header, ArrayType(StringType())),
             StructField(begin_header, ArrayType(StringType())),
             StructField(embeddings_header, ArrayType(StringType())),
             StructField(end_header, ArrayType(StringType())),
             StructField(metadata_header, ArrayType(StringType())),
             StructField(result_header, ArrayType(StringType())),
             StructField(sentence_embeddings_header, ArrayType(StringType()))
             ])
    
        # list of lists of lists of lists of ... lists
        frankenstein_list = [[annotatorType, begin, embeddings, end, metadata, result, sentence_embeddings]]
        df_frankenstein = spark.createDataFrame(frankenstein_list, schema=frankenstein_schema)
    
        print(df_single_list.schema)
        print(df_frankenstein.schema)
    
        # let's see how it is
        df_single_list.select(
            annotatorType_header,
            begin_header,
            end_header,
            result_header,
            array(metadata_entity_header),
            embeddings_header,
            sentence_embeddings_header).show()
    
        # let's see again
        df_frankenstein.select(
            annotatorType_header,
            begin_header,
            end_header,
            result_header,
            metadata_header,
            embeddings_header,
            sentence_embeddings_header).show()
    

    Output:

        StructType(List(StructField(annotatorType,StringType,true),StructField(begin,StringType,true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,StringType,true),StructField(metadata,StructType(List(StructField(chunk,StringType,true),StructField(entity,StringType,true),StructField(sentence,StringType,true))),true),StructField(result,StringType,true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))
        StructType(List(StructField(annotatorType,ArrayType(StringType,true),true),StructField(begin,ArrayType(StringType,true),true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,ArrayType(StringType,true),true),StructField(metadata,ArrayType(StringType,true),true),StructField(result,ArrayType(StringType,true),true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))
    
        +-------------+-----+---+------+----------------------+----------+-------------------+
        |annotatorType|begin|end|result|array(metadata.entity)|embeddings|sentence_embeddings|
        +-------------+-----+---+------+----------------------+----------+-------------------+
        |        chunk|  166|169|  Lyft|                [MISC]|        []|                 []|
        |        chunk|   11| 14|  Lyft|                [MISC]|        []|                 []|
        |        chunk|   52| 55|  Lyft|                [MISC]|        []|                 []|
        +-------------+-----+---+------+----------------------+----------+-------------------+
        +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
        |       annotatorType|               begin|                 end|              result|            metadata|          embeddings| sentence_embeddings|
        +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
        |[[chunk, chunk, c...|[[0, 717, 811, 10...|[[11, 720, 816, 1...|[[FedEx Ground, D...|[[ORG, LOC, ORG, ...|[[[], [], [], [],...|[[[], [], [], [],...|
        +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    

    You will have to select from each dataframe separately since they are different in datatypes, but the content is ready (if I understood your requirement from the output) to consume

    ( ͡° ͜ʖ ͡°)