Search code examples
pythondataframedictionarypyspark

PySpark code to convert Dictionary to Spark Dataframe


I am trying to transform a dictionary into SPARK dataframe. But it is appending all my values to a single row. For my final result I want to have a SPARK dataframe containing 3 rows corresponding to each unique_survey_id.

Write a PySpark code for the same.

inferenced_df=
{

**'unique_survey_id'**: ['0001', '0002', '0003'],

 '**verbatim**': ["My name is John", "I am 23 yrs old, "I live in US"], 

'**classification_critical_process_fg**': [0, 0, 0], 

'**reason_critical_process_fg**': [**{**"Customer's Issue": "I wish there were more providers ", 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': "Although the issue is unresolved, So flag is 0"**}**, 
**{**"Customer's Issue": 'I am trying to make a payment', 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': "Although the issue is unresolved So flag is 0"**}**, 
**{**"Customer's Issue": '', 'Status of Resolution': '', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': 'The review does not mention any issue or negative experience. So the flag is 0'**}**],

'**classification_critical_technical_fg**': ['No', 'No', 'No'], 

'**reason_critical_technical_fg**': ['The review mentions difficulty in finding provider.', 'The review mentions an unresolved issue ', 'The review does not mention any technical issues'], 

'**classification_critical_crc_escalation_fg**': ['Yes', 'Yes', 'No'], 
'**reason_critical_crc_escalation_fg**': ['The customer is expressing frustration.', 'The customer is expressing dissatisfaction', 'The review does not mention any unresolved issues.'], 

'**classification_insight_experience_fg**': ['Yes', 'No', 'Yes'], 

'**reason_insight_experience_fg**': ["The review mentions a suggestion", 'The review mentions an unresolved', "The review explicitly mentions positive feedback"], 

'**classification_insight_process_fg**': [0, 0, 0],
 
'**reason_insight_process_fg**': [**{**"Customer's Issue": "I need a diabetic eye exam ", 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': 'Customer has just stated the issue.**}**, **{**"Customer's Issue": 'I am trying to make a payment ', 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': 'Customer has just stated the issue.**}**,**{**"Customer's Issue": '', 'Status of Resolution': '', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': "The customer review does not mention any issue or negative experience."**}**]

}

Current Output

enter image description here

The code is:

    from pyspark.sql.types import StructType, StructField, StringType
    def alerts_inference(inferenced_df) :


    schema = StructType([
        StructField("unique_survey_id", StringType(), True),
        StructField("verbatim", StringType(), True),
        StructField("classification_critical_crc_escalation_fg", StringType(), True),
        StructField("reason_critical_crc_escalation_fg", StringType(), True),
        StructField("classification_critical_technical_fg", StringType(), True),
        StructField("reason_critical_technical_fg", StringType(), True),
        StructField("classification_critical_process_fg", StringType(), True),
        StructField("reason_critical_process_fg", StringType(), True),
        StructField("classification_insight_experience_fg", StringType(), True),
        StructField("reason_insight_experience_fg", StringType(), True),
        StructField("classification_insight_process_fg", StringType(), True),
        StructField("reason_insight_process_fg", StringType(), True)
    ])
    inferenced_df = spark.createDataFrame([inferenced_df],schema)
    return inferenced_df


**EXPECTED O/P**: A dataframe containing 3 rows corresponding to each unique_survey_id and its corresponding columns.

Solution

  • You can create a Spark dataframe from a Pandas dataframe:

      pandas_df = pd.DataFrame(inferenced_df)
      inferenced_df = spark.createDataFrame(pandas_df,schema)
    

    res