A dynamodb table is exported to s3 and aws glue crawler crawls the s3 data. Aws glue jobs take the source from the crawled data and here's the schema that was transformed by MergeLineItems:
def MergeLineItems(rec):
rec["lineItems1"] = {}
a = []
for x in rec["lineItems"]:
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
The schema is like this:
-- lineItems1: array
| |-- element: struct
| | |-- price: struct
| | | |-- N: string
| | |-- grade: struct
| | | |-- S: string
| | |-- expectedAmount: struct
| | | |-- N: string
| | |-- notifiedAmount: struct
| | | |-- N: string
When I run the aws glue job and the data that was saved into a dynamodb is like this:
[
{
"M":
{
"expectedAmount":
{
"M":
{
"N":
{
"S": "10"
}
}
},
"grade":
{
"M":
{
"S":
{
"S": "GradeAAA"
}
}
},
"notifiedAmount":
{
"M":
{
"N":
{
"S": "0"
}
}
},
"price":
{
"M":
{
"N":
{
"S": "2.15"
}
}
}
}
}
]
While the data from the original dynamodb is different than this. How can I change the data into this one:
[
{
"M":
{
"expectedAmount":
{
"N": "10"
},
"notifiedAmount":
{
"N": "0"
},
"grade":
{
"S": "GradeAAA"
},
"price":
{
"N": "2.15"
}
}
}
]
I got it working. Here's my answer:
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")
def MergeLineItems(rec):
rec["lineItems1"] = {}
a = []
for x in rec["lineItems"]:
val = x["M"]["expectedAmount"]["N"]
x["M"]["expectedAmount"] = Decimal(val)
val = x["M"]["notifiedAmount"]["N"]
x["M"]["notifiedAmount"] = Decimal(val)
val = x["M"]["grade"]["S"]
x["M"]["grade"] = str(val)
val = x["M"]["price"]["N"]
x["M"]["price"] = Decimal(val)
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems")
glueContext.write_dynamic_frame_from_options(
frame=mapped_dyF,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "us-east-1",
"dynamodb.output.tableName": "mydb",
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()