Search code examples
pythonpython-3.xapache-sparkpysparkflatmap

Unzip pyspark pipelineRDD of python dicts to pyspark Dataframe


I´m using a flatmap to parse a dataframe and it is working fine, but I´m not able to reshape the final result into a multiple-column dataset. How can I parse this RDD? This is a sample row of my result after the flatmap:

[Row(XXXX-XXXX-XXXX-XXXXX-XXXXXX={'m_ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX', 'ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX', 'pp_breaker_power_phase': 'L1_L2', 'pp_breaker_poles': 2, 'pp_breaker_panel_circuit_number': 2, 'cp_ci_id': None, 'cp_value': None, 'phase': 'L1', 'pole': 2})]

I´m passing a dataframe with the same columns that you see within the dict, and this is the function that I use with the flatmap:

def get_poles_phases(row):
    """
    :param row:
    :return:
    """
    new_rows = []
    initial_pole = row.pp_breaker_panel_circuit_number
    phases = row.pp_breaker_power_phase.split('_')

    for _ in range(row.pp_breaker_poles):
        temp = row.asDict()
        temp['phase'] = phases[_]
        temp['pole'] = initial_pole

        if row.cp_value != 'Phase Grouping':
            initial_pole += 2
        else:
            logger.error('Panel configuration not recognized.')
        
        new_rows.append(row(temp))

    return new_rows

I tried with a schema of Structfields, but didn´t work

 cols = [StructField('m_ci_id', StringType(), True),
         StructField('ci_id', StringType(), True),
         StructField('pp_breaker_power_phase', StringType(), True),
         StructField('pp_breaker_poles', StringType(), True),
         StructField('pp_breaker_panel_circuit_number', StringType(), True),
         StructField('cp_ci_id', StringType(), True),
         StructField('cp_value', StringType(), True),
         StructField('phase', StringType(), True),
         StructField('pole', StringType(), True)]

schema = StructType(cols)
poles_phases = poles_phases.toDF(schema)

I also tried with passing a list of column names.

poles_phases = poles_phases.toDF(['m_ci_id', 'ci_id', 'pp_breaker_power_phase', 'pp_breaker_poles', 'pp_breaker_panel_circuit_number', 'cp_ci_id', 'cp_value', 'phase', 'pole'])

I suspect this is not working because I'm getting an RDD with only one column, but I don't know how to parse that single dict out so the schema matches.


Solution

  • I figured it out:

    from pyspark.sql import Row
    poles_phases = poles_phases.map(lambda row: Row(**list(row.asDict().values())[0]))
    

    this is building a new Row by unpacking the value dict. After that, you can use

    poles_phases = poles_phases.toDF(['m_ci_id', 'ci_id', 'pp_breaker_power_phase', 'pp_breaker_poles', 'pp_breaker_panel_circuit_number', 'cp_ci_id', 'cp_value', 'phase', 'pole'])
    

    If you have None values the schema inference can fail, so you need to declare it explicitly e.g.,

    cols = [StructField('m_ci_id', StringType(), True),
            StructField('ci_id', StringType(), True),
            StructField('pp_breaker_power_phase', StringType(), True),
            StructField('pp_breaker_poles', StringType(), True),
            StructField('pp_breaker_panel_circuit_number', StringType(), True),
            StructField('cp_ci_id', StringType(), True),
            StructField('cp_value', StringType(), True),
            StructField('phase', StringType(), True),
            StructField('pole', StringType(), True)]
    
    schema = StructType(cols)
    poles_phases = poles_phases.toDF(schema)