Search code examples
aws-gluepysparkaws-glue-spark

add missing column to AWS Glue DataFrame


I am reading a DynamoDB Table with Glue, due to the dynamic schema it can happen that some columns are not existing. Adding them works fine with the following code but I am not sure how to make the function dynamic if I need to add multiple columns.

# add missing columns if not available
def AddCustRegName(r):
    r["customerRegistrationName"] = ""  # add column with empty string.
    return r

if addCustRegName:
    case_df_final = Map.apply(frame=case_df_final, f=AddCustRegName)

Any suggestions?

The following code is failing with the below error

# add missing columns if not available
def AddColumn(r, col):
    r[col] = ""  # add column with empty string.
    return r

case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded'))

case_df_final.toDF().printSchema()

Fail to execute line 6: case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded')) Traceback (most recent call last): File "/tmp/zeppelin_pyspark-4928209310219195923.py", line 375, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "", line 3, in AddColumn TypeError: 'DynamicFrame' object does not support item assignment


Solution

  • The function you pass in Map can have only one argument :

    f – The function to apply to all DynamicRecords in the DynamicFrame. The function must take a DynamicRecord as an argument and return a new DynamicRecord produced by the mapping (required).

    However, you can do it on pyspark data frame instead of DynamicFrame :

    from pyspark.sql import functions as F
    
    def AddColumn(sdf, new_col):
        return sdf.withColumn(new_col, F.lit(""))
    
    case_sdf_final = AddColumn(case_df_final.toDF(), "accessoryTaxIncluded")
    
    case_sdf_final.printSchema()
    

    Or if you have a list of columns to add you can use functools.reduce like this:

    import functools
    
    new_cols = ["customerRegistrationName", "accessoryTaxIncluded"]
    
    case_sdf_final = functools.reduce(
        lambda acc, c: AddColumn(acc, c),
        new_cols,
        case_df_final.toDF()
    )
    
    case_sdf_final.printSchema()
    

    Then go back to DynamicFrame:

    case_df_final = DynamicFrame.fromDF(case_sdf_final, glueContext, "case_df_final")