Search code examples
pythonpysparkapache-spark-sqlpyspark-schema

How to flatten nested struct using PySpark?


How to flatten nested struct using PySpark?

Link to dataset https://drive.google.com/file/d/1-xOpd2B7MDgS1t4ekfipHSerIm6JMz9e/view?usp=sharing

enter image description here Thanks in advance.


Solution

  • While I agree with Phantoms that it is very basic to flatten a df still if you still haven't figured it out you can use below function to flatten your df

    def flattenNestedData(nestedDF):
      from pyspark.sql.functions import col
      from pyspark.sql.types import StructType,ArrayType
      try:
           ##Fetching Complex Datatype Columns from Schema
           fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType]) 
           while len(fieldNames)!=0:
             fieldName=list(fieldNames.keys())[0]
             print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
             if type(fieldNames[fieldName]) == StructType:
               extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
               nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)
        
             elif type(fieldNames[fieldName]) == ArrayType: 
               nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))
        
             fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])   
             
           return nestedDF
    
          
      except Exception as err:
        raise Exception("Error Occured at while flattening the dataframe : " + str(err))
    

    You can remove Arraytype check if you don't want to explode those