Search code examples
pythondataframeapache-sparkpysparkrdd

Spark Error when unpacking items from tuple in RDD


I wrote a script on Jupyter notebook to read an RDD and perform operations. The script works fine on Jupyter.

rdd=   [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
                  [{'pol_cat_id':'234','pol_dt':'20100220'}],
                  [{'qor_pol_id':'23492','qor_cd':'30'}]),

     ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                  [{'pol_cat_id':'532','pol_dt':'20091020'}],
                  [{'qor_pol_id':'49320','qor_cd':'21'}]) ]
              

def flatten_map(record):
    # Unpack items
    id, items, [line], [pls] = record
    pol_id = pls["pol_cat_id"]
    pol_dt = pls["pol_dt"]
    qor_id = pls["qor_pol_id"]
    for item in items:
        yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1


 result = (rdd
    # Expand data
    .flatMap(flatten_map)
    # Flatten tuples
    .map(lambda x: x[0],))) 

However, when converting to a Python script, I get an error:

2019-10-01 14:12:46,901:ERROR: id, items, [line], [pls] = record

2019-10-01 14:12:46,901:ERROR:ValueError: not enough values to unpack

(expected 1, got 0)

Any suggestions? Is there a difference between how Python handles this on notebook vs .py?


Solution

  • It's just some mistakes taking the right value for the right variables.

    Please go through the following code:

    rdd = [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                      {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                      {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
                      [{'pol_cat_id':'234','pol_dt':'20100220'}],
                      [{'qor_pol_id':'23492','qor_cd':'30'}]),
         ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                      {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                      {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                      [{'pol_cat_id':'532','pol_dt':'20091020'}],
                      [{'qor_pol_id':'49320','qor_cd':'21'}]) ]
    def flatten_map(record):
        # Unpack items
        id, items, [line], [pls] = record
        pol_id = line["pol_cat_id"]
        pol_dt = line["pol_dt"]
        qor_id = pls["qor_pol_id"]
        for item in items:
            yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
    result = spark.sparkContext.parallelize(rdd).flatMap(flatten_map).map(lambda x: x[0])
    result.collect()
    
    # OUTPUT
    [('xxxxx99', 'Q', '100', '100', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '33', '200', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '64', '10', 'AZ', '234', '20100220', '23492'), ('xxxxx86', 'R', '20', '100', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '44', '500', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '66', '50', 'TX', '532', '20091020', '49320')]