I'm trying to extract the output from the sparknlp (using Pretrained Pipeline 'explain_document_dl'). I have spent a lot of time looking for ways (UDFs, explode, etc) but cannot get anywhere close to a workable solution. Say I want to get extract values under result
and metadata
from the column entities
. In that column there is an array containing multiple dictionaries
When I use df.withColumn("entity_name", explode("entities.result"))
, only the value from the first dictionary is extracted.
The content of the "entities" column is a list of dictionaries.
The attempt to provide a reproducible example/re-create the dataframe (thanks to suggestion provided by @jonathan below):
# content of one cell as an example:
d = [{"annotatorType":"chunk","begin":2740,"end":2747,"result":"•Ability","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}, {"annotatorType":"chunk","begin":2740,"end":2747,"result":"Fedex","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}]
from pyspark.sql.types import StructType, StructField, StringType
from array import array
schema = StructType([StructField('annotatorType', StringType(), True),
StructField('begin', IntegerType(), True),
StructField('end', IntegerType(), True),
StructField('result', StringType(), True),
StructField('sentence', StringType(), True),
StructField('chunk', StringType(), True),
StructField('metadata', StructType((StructField('entity', StringType(), True),
StructField('sentence', StringType(), True),
StructField('chunk', StringType(), True)
)), True),
StructField('embeddings', StringType(), True),
StructField('sentence_embeddings', StringType(), True)
]
)
df = spark.createDataFrame(d, schema=schema)
df.show()
In this case of a single list of dictionary, it works:
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end| result|sentence|chunk| metadata|embeddings|sentence_embeddings|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
| chunk| 2740|2747|•Ability| null| null|[ORG, 8, 22]| []| []|
| chunk| 2740|2747| Fedex| null| null|[ORG, 8, 22]| []| []|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
But I am stuck on how to apply this to a column, which contains some cells with an array of multiple dictionaries (so multiple rows to the original cell).
I tried to apply the same schema to the entities
column, and I had to convert the column to json first.
ent1 = ent1.withColumn("entities2", to_json("entities"))
It works for cells with an array of 1 dictionary, but gives null
to cells that have an array of multiple dictionaries (the 4th row):
ent1.withColumn("entities2", from_json("entities2", schema)).select("entities2.*").show()
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|result|sentence|chunk| metadata|embeddings|sentence_embeddings|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
| chunk| 166| 169| Lyft| null| null|[MISC, 0, 0]| []| []|
| chunk| 11| 14| Lyft| null| null|[MISC, 0, 0]| []| []|
| chunk| 52| 55| Lyft| null| null|[MISC, 1, 0]| []| []|
| null| null|null| null| null| null| null| null| null|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
The desired output is
+-------------+-----+----+----------------+------------------------+----------+-------------------+
|annotatorType|begin| end| result | metadata |embeddings|sentence_embeddings|
+-------------+-----+----+----------------+------------------------+----------+-------------------+
| chunk| 166| 169|Lyft |[MISC] | []| []|
| chunk| 11| 14|Lyft |[MISC] | []| []|
| chunk| 52| 55|Lyft. |[MISC] | []| []|
| chunk| [..]|[..]|[Lyft,Lyft, |[MISC,MISC,MISC, | []| []|
| | | |FedEx Ground..] |ORG,LOC,ORG,ORG,ORG,ORG]| | |
+-------------+-----+----+----------------+------------------------+----------+-------------------+
I also tried convert to json for each row, but I lose track of the original row and was given flatted son:
new_df = sqlContext.read.json(ent2.rdd.map(lambda r: r.entities2))
new_df.show()
+-------------+-----+----------+----+------------+----------------+-------------------+
|annotatorType|begin|embeddings| end| metadata| result|sentence_embeddings|
+-------------+-----+----------+----+------------+----------------+-------------------+
| chunk| 166| []| 169|[0, MISC, 0]| Lyft| []|
| chunk| 11| []| 14|[0, MISC, 0]| Lyft| []|
| chunk| 52| []| 55|[0, MISC, 1]| Lyft| []|
| chunk| 0| []| 11| [0, ORG, 0]| FedEx Ground| []|
| chunk| 717| []| 720| [1, LOC, 4]| Dock| []|
| chunk| 811| []| 816| [2, ORG, 5]| Parcel| []|
| chunk| 1080| []|1095| [3, ORG, 6]|Parcel Assistant| []|
| chunk| 1102| []|1108| [4, ORG, 7]| • Daily| []|
| chunk| 1408| []|1417| [5, ORG, 8]| Assistants| []|
+-------------+-----+----------+----+------------+----------------+-------------------+
I tried to apply UDF to go through the list of array inside "entities" :
def flatten(my_dict):
d_result = defaultdict(list)
for sub in my_dict:
val = sub['result']
d_result["result"].append(val)
return d_result["result"]
ent = ent.withColumn('result', flatten(df.entities))
TypeError: Column is not iterable
I found this post Apache Spark Read JSON With Extra Columns being closely resembling my problem, but after converting the column entities
to json, I still can't solve it by the solutions provided in that post.
Any help is appreciated!! Ideally solutions in python but examples in scala is helpful too!
The reason of getting null
is because the schema
variable doesn't exactly represent the list of dictionaries you are passing in as data
from pyspark.shell import *
from pyspark.sql.types import *
schema = StructType([StructField('result', StringType(), True),
StructField('metadata', StructType((StructField('entity', StringType(), True),
StructField('sentence', StringType(), True),
StructField('chunk', StringType(), True))), True)])
df = spark.createDataFrame(d1, schema=schema)
df.show()
If you prefer a customised solution, you can try pure python/pandas approach
import pandas as pd
from pyspark.shell import *
result = []
metadata_entity = []
for row in d1:
result.append(row.get('result'))
metadata_entity.append(row.get('metadata').get('entity'))
schema = {'result': [result], 'metadata.entity': [metadata_entity]}
pandas_df = pd.DataFrame(schema)
df = spark.createDataFrame(pandas_df)
df.show()
# specific columns
df.select('result','metadata.entity').show()
EDIT
IMHO after reading all approaches you have been trying, I think sc.parallelize
still makes the trick for pretty complex cases. I don't have your original variable, but I can OCR your image and take it from there --although there is no longer Classroom Teacher or Instructional values. Hopefully it will for all that be useful.
You can always create a mock dataframe with the structure you need and get its schema
For complex cases with nested data types, you may use SparkContext and read the resulting JSON format
import itertools
from pyspark.shell import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
# assume two lists in two dictionary keys to make four cells
# since I don't have but entities2, I can just replicate it
sample = {
'single_list': [{'annotatorType': 'chunk', 'begin': '166', 'end': '169', 'result': 'Lyft',
'metadata': {'entity': 'MISC', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '11', 'end': '14', 'result': 'Lyft',
'metadata': {'entity': 'MISC', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '52', 'end': '55', 'result': 'Lyft',
'metadata': {'entity': 'MISC', 'sentence': '1', 'chunk': '0'}, 'embeddings': [],
'sentence_embeddings': []}],
'frankenstein': [
{'annotatorType': 'chunk', 'begin': '0', 'end': '11', 'result': 'FedEx Ground',
'metadata': {'entity': 'ORG', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '717', 'end': '720', 'result': 'Dock',
'metadata': {'entity': 'LOC', 'sentence': '4', 'chunk': '1'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '811', 'end': '816', 'result': 'Parcel',
'metadata': {'entity': 'ORG', 'sentence': '5', 'chunk': '2'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '1080', 'end': '1095', 'result': 'Parcel Assistant',
'metadata': {'entity': 'ORG', 'sentence': '6', 'chunk': '3'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '1102', 'end': '1108', 'result': '* Daily',
'metadata': {'entity': 'ORG', 'sentence': '7', 'chunk': '4'}, 'embeddings': [],
'sentence_embeddings': []},
{'annotatorType': 'chunk', 'begin': '1408', 'end': '1417', 'result': 'Assistants',
'metadata': {'entity': 'ORG', 'sentence': '8', 'chunk': '5'}, 'embeddings': [],
'sentence_embeddings': []}]
}
# since they are structurally different, get two dataframes
df_single_list = spark.read.json(sc.parallelize(sample.get('single_list')))
df_frankenstein = spark.read.json(sc.parallelize(sample.get('frankenstein')))
# print better the table first border
print('\n')
# list to create a dataframe schema
annotatorType = []
begin = []
embeddings = []
end = []
metadata = []
result = []
sentence_embeddings = []
# PEP8 here to have an UDF instead of lambdas
# probably a dictionary with actions to avoid IF statements
function_metadata = lambda x: [x.entity]
for k, i in enumerate(df_frankenstein.columns):
if i == 'annotatorType':
annotatorType.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
if i == 'begin':
begin.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
if i == 'embeddings':
embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
if i == 'end':
end.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
if i == 'metadata':
_temp = list(map(function_metadata, df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect()))
metadata.append(list(itertools.chain.from_iterable(_temp)))
if i == 'result':
result.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
if i == 'sentence_embeddings':
sentence_embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
# headers
annotatorType_header = 'annotatorType'
begin_header = 'begin'
embeddings_header = 'embeddings'
end_header = 'end'
metadata_header = 'metadata'
result_header = 'result'
sentence_embeddings_header = 'sentence_embeddings'
metadata_entity_header = 'metadata.entity'
frankenstein_schema = StructType(
[StructField(annotatorType_header, ArrayType(StringType())),
StructField(begin_header, ArrayType(StringType())),
StructField(embeddings_header, ArrayType(StringType())),
StructField(end_header, ArrayType(StringType())),
StructField(metadata_header, ArrayType(StringType())),
StructField(result_header, ArrayType(StringType())),
StructField(sentence_embeddings_header, ArrayType(StringType()))
])
# list of lists of lists of lists of ... lists
frankenstein_list = [[annotatorType, begin, embeddings, end, metadata, result, sentence_embeddings]]
df_frankenstein = spark.createDataFrame(frankenstein_list, schema=frankenstein_schema)
print(df_single_list.schema)
print(df_frankenstein.schema)
# let's see how it is
df_single_list.select(
annotatorType_header,
begin_header,
end_header,
result_header,
array(metadata_entity_header),
embeddings_header,
sentence_embeddings_header).show()
# let's see again
df_frankenstein.select(
annotatorType_header,
begin_header,
end_header,
result_header,
metadata_header,
embeddings_header,
sentence_embeddings_header).show()
Output:
StructType(List(StructField(annotatorType,StringType,true),StructField(begin,StringType,true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,StringType,true),StructField(metadata,StructType(List(StructField(chunk,StringType,true),StructField(entity,StringType,true),StructField(sentence,StringType,true))),true),StructField(result,StringType,true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))
StructType(List(StructField(annotatorType,ArrayType(StringType,true),true),StructField(begin,ArrayType(StringType,true),true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,ArrayType(StringType,true),true),StructField(metadata,ArrayType(StringType,true),true),StructField(result,ArrayType(StringType,true),true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))
+-------------+-----+---+------+----------------------+----------+-------------------+
|annotatorType|begin|end|result|array(metadata.entity)|embeddings|sentence_embeddings|
+-------------+-----+---+------+----------------------+----------+-------------------+
| chunk| 166|169| Lyft| [MISC]| []| []|
| chunk| 11| 14| Lyft| [MISC]| []| []|
| chunk| 52| 55| Lyft| [MISC]| []| []|
+-------------+-----+---+------+----------------------+----------+-------------------+
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| annotatorType| begin| end| result| metadata| embeddings| sentence_embeddings|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|[[chunk, chunk, c...|[[0, 717, 811, 10...|[[11, 720, 816, 1...|[[FedEx Ground, D...|[[ORG, LOC, ORG, ...|[[[], [], [], [],...|[[[], [], [], [],...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
You will have to select from each dataframe separately since they are different in datatypes, but the content is ready (if I understood your requirement from the output) to consume
( ͡° ͜ʖ ͡°)