I'm trying to flatten data in an RDD. The RDD is structured as a list of 4-tuples with the first element - primary_id , the second element - a list of dictionaries, third and fourth elements each contain a single list containing a dictionary.
rdd= [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
[{'pol_cat_id':'234','pol_dt':'20100220'}],
[{'qor_pol_id':'23492','qor_cd':'30'}]),
('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
[{'pol_cat_id':'532','pol_dt':'20091020'}],
[{'qor_pol_id':'49320','qor_cd':'21'}]) ]
I want to flatten the data so that it appears in the format
how would I do this in Pyspark?
Here is what I have attempted but this gives me an error: Too many tuples to unpack
def flatten_map(record):
try:
yield(record)
# Unpack items
id, items, line, pls = record
pol_id = pls["pol_cat_id"]
pol_dt = pls["pol_dt"]
qor_id = pls["qor_pol_id"]
for item in items:
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
except Exception as e:
pass
result = (rdd
# Expand data
.flatMap(flatten_map)
# Flatten tuples
.map(lambda x: x[0], ))
I can post the complete error if required but for sake of brevity,
ValueError: too many values to unpack (expected 2)
Note: converting to pandas doesn't work as RDD is too big
IIUC, you can run flatMap() by using a list comprehension to iterate through the 2nd item of the 4-item tuples(1 string + 3 lists), for example:
from pyspark.sql import Row
myrdd = sc.parallelize(rdd)
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
#[({'primary_id': 'xxxxx99'},
# {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
# {'pol_cat_id': '234', 'pol_dt': '20100220'},
# {'qor_pol_id': '23492', 'qor_cd': '30'}),
# ......
Short explanation: in the list comprehension of flatMap function, besides iterating the 2nd item x[1]
(as z
which is a dictionary), I also converted the first String item x[0] into a dictionary with one entry: {"primary_id":x[0]}
and take the first item of x[2] and x[3], both of which are dictionaries.
Thus after running the above flatMap function, the RDD element becomes a tuple of 4 dictionaries, what you need to do next is just to merge them. below is my sample-code to map the tuple of 4-dictionaries into Row object, you might have to change the logic how to handle exceptions and missing fields to fit your own requirements.
cols = ['primary_id', 'cov_id', 'cov_cd', 'cov_amt', 'cov_state', 'pol_cat_id', 'pol_dt', 'qor_pol_id', 'qor_cd']
def merge_dict(arr, cols):
row = {}
try:
for e in arr:
if type(e) is dict: row.update(e)
except:
pass
finally:
return Row(**dict({ c:row.get(c, None) for c in cols })) if row else None
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ) \
.map(lambda x: merge_dict(x, cols)) \
.filter(bool) \
.toDF() \
.show()
+-------+------+------+---------+----------+--------+----------+------+----------+
|cov_amt|cov_cd|cov_id|cov_state|pol_cat_id| pol_dt|primary_id|qor_cd|qor_pol_id|
+-------+------+------+---------+----------+--------+----------+------+----------+
| 100| 100| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 200| 33| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 10| 64| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 100| 20| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 500| 44| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 50| 66| R| TX| 532|20091020| xxxxx86| 21| 49320|
+-------+------+------+---------+----------+--------+----------+------+----------+
BTW. if you want to make your original function work, check the following 5 lines containing #<--
:
def flatten_map(record):
try:
#yield(record) #<-- comment this out, no need unprocessed data in output
# Unpack items
id, items, line, pls = record
pol_id = line[0]["pol_cat_id"] #<-- from line[0] not pls
pol_dt = line[0]["pol_dt"] #<-- from line[0] not pls
qor_id = pls[0]["qor_pol_id"] #<-- from pls[0] not pls
for item in items:
#<-- below line removed the ending ", 1", thus no need the last map() function to flatten tuples
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
except Exception as e:
pass