I am reading a DynamoDB Table with Glue, due to the dynamic schema it can happen that some columns are not existing. Adding them works fine with the following code but I am not sure how to make the function dynamic if I need to add multiple columns.
# add missing columns if not available
def AddCustRegName(r):
r["customerRegistrationName"] = "" # add column with empty string.
return r
if addCustRegName:
case_df_final = Map.apply(frame=case_df_final, f=AddCustRegName)
Any suggestions?
The following code is failing with the below error
# add missing columns if not available
def AddColumn(r, col):
r[col] = "" # add column with empty string.
return r
case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded'))
Fail to execute line 6: case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded')) Traceback (most recent call last): File "/tmp/zeppelin_pyspark-4928209310219195923.py", line 375, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "", line 3, in AddColumn TypeError: 'DynamicFrame' object does not support item assignment
The function you pass in Map
can have only one argument :
– The function to apply to all DynamicRecords in the DynamicFrame. The function must take a DynamicRecord as an argument and return a new DynamicRecord produced by the mapping (required).
However, you can do it on pyspark data frame instead of DynamicFrame :
from pyspark.sql import functions as F
def AddColumn(sdf, new_col):
return sdf.withColumn(new_col, F.lit(""))
case_sdf_final = AddColumn(case_df_final.toDF(), "accessoryTaxIncluded")
Or if you have a list of columns to add you can use functools.reduce
like this:
import functools
new_cols = ["customerRegistrationName", "accessoryTaxIncluded"]
case_sdf_final = functools.reduce(
lambda acc, c: AddColumn(acc, c),
Then go back to DynamicFrame:
case_df_final = DynamicFrame.fromDF(case_sdf_final, glueContext, "case_df_final")