Search code examples

Pyspark flatten RDD error:: Too many values to unpack

I'm trying to flatten data in an RDD. The RDD is structured as a list of 4-tuples with the first element - primary_id , the second element - a list of dictionaries, third and fourth elements each contain a single list containing a dictionary.

rdd=   [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],

     ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                  [{'qor_pol_id':'49320','qor_cd':'21'}]) ]

I want to flatten the data so that it appears in the format

enter image description here

how would I do this in Pyspark?

Here is what I have attempted but this gives me an error: Too many tuples to unpack

def flatten_map(record):
        # Unpack items
        id, items, line, pls = record
        pol_id = pls["pol_cat_id"]
        pol_dt = pls["pol_dt"]
        qor_id = pls["qor_pol_id"]
        for item in items:
            yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
    except Exception as e:

 result = (rdd
    # Expand data
    # Flatten tuples
    .map(lambda x: x[0], ))

I can post the complete error if required but for sake of brevity,

ValueError: too many values to unpack (expected 2)

Note: converting to pandas doesn't work as RDD is too big


  • IIUC, you can run flatMap() by using a list comprehension to iterate through the 2nd item of the 4-item tuples(1 string + 3 lists), for example:

    from pyspark.sql import Row
    myrdd = sc.parallelize(rdd)
    myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
    #[({'primary_id': 'xxxxx99'},
    #  {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
    #  {'pol_cat_id': '234', 'pol_dt': '20100220'},
    #  {'qor_pol_id': '23492', 'qor_cd': '30'}),
    # ......

    Short explanation: in the list comprehension of flatMap function, besides iterating the 2nd item x[1] (as z which is a dictionary), I also converted the first String item x[0] into a dictionary with one entry: {"primary_id":x[0]} and take the first item of x[2] and x[3], both of which are dictionaries.

    Thus after running the above flatMap function, the RDD element becomes a tuple of 4 dictionaries, what you need to do next is just to merge them. below is my sample-code to map the tuple of 4-dictionaries into Row object, you might have to change the logic how to handle exceptions and missing fields to fit your own requirements.

    cols = ['primary_id', 'cov_id', 'cov_cd', 'cov_amt', 'cov_state', 'pol_cat_id', 'pol_dt', 'qor_pol_id', 'qor_cd']
    def merge_dict(arr, cols):
      row = {}
        for e in arr:
          if type(e) is dict: row.update(e)
        return Row(**dict({ c:row.get(c, None) for c in cols })) if row else None
    myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ) \
       .map(lambda x: merge_dict(x, cols)) \
       .filter(bool) \
       .toDF() \
    |cov_amt|cov_cd|cov_id|cov_state|pol_cat_id|  pol_dt|primary_id|qor_cd|qor_pol_id|
    |    100|   100|     Q|       AZ|       234|20100220|   xxxxx99|    30|     23492|
    |    200|    33|     Q|       AZ|       234|20100220|   xxxxx99|    30|     23492|
    |     10|    64|     Q|       AZ|       234|20100220|   xxxxx99|    30|     23492|
    |    100|    20|     R|       TX|       532|20091020|   xxxxx86|    21|     49320|
    |    500|    44|     R|       TX|       532|20091020|   xxxxx86|    21|     49320|
    |     50|    66|     R|       TX|       532|20091020|   xxxxx86|    21|     49320|

    BTW. if you want to make your original function work, check the following 5 lines containing #<--:

    def flatten_map(record): 
        #yield(record)    #<-- comment this out, no need unprocessed data in output
        # Unpack items 
        id, items, line, pls = record 
        pol_id = line[0]["pol_cat_id"]      #<-- from line[0] not pls
        pol_dt = line[0]["pol_dt"]          #<-- from line[0] not pls
        qor_id = pls[0]["qor_pol_id"]       #<-- from pls[0] not pls
        for item in items: 
          #<-- below line removed the ending ", 1", thus no need the last map() function to flatten tuples
          yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
      except Exception as e: 