Search code examples
pysparkamazon-dynamodbaws-glueaws-glue-spark

aws glue pyspark remove struct in an array but keep the data and save into dynamodb


A dynamodb table is exported to s3 and aws glue crawler crawls the s3 data. Aws glue jobs take the source from the crawled data and here's the schema that was transformed by MergeLineItems:

def MergeLineItems(rec):
    rec["lineItems1"] = {}
    a = []
    for x in rec["lineItems"]:
        a.append(x["M"])
    rec["lineItems1"] = a
    return rec
  
mapped_dyF =  Map.apply(frame = Transform0, f = MergeLineItems)

The schema is like this:

    -- lineItems1: array
    |    |-- element: struct
    |    |    |-- price: struct
    |    |    |    |-- N: string
    |    |    |-- grade: struct
    |    |    |    |-- S: string
    |    |    |-- expectedAmount: struct
    |    |    |    |-- N: string
    |    |    |-- notifiedAmount: struct
    |    |    |    |-- N: string

When I run the aws glue job and the data that was saved into a dynamodb is like this:

[
    {
        "M":
        {
            "expectedAmount":
            {
                "M":
                {
                    "N":
                    {
                        "S": "10"
                    }
                }
            },
            "grade":
            {
                "M":
                {
                    "S":
                    {
                        "S": "GradeAAA"
                    }
                }
            },
            "notifiedAmount":
            {
                "M":
                {
                    "N":
                    {
                        "S": "0"
                    }
                }
            },
            "price":
            {
                "M":
                {
                    "N":
                    {
                        "S": "2.15"
                    }
                }
            }
        }
    }
]

While the data from the original dynamodb is different than this. How can I change the data into this one:

[
    {
        "M":
        {
            "expectedAmount":
            {
                "N": "10"
            },
            "notifiedAmount":
            {
                "N": "0"
            },
            "grade":
            {
                "S": "GradeAAA"
            },
            "price":
            {
                "N": "2.15"
            }
        }
    }
]

Solution

  • I got it working. Here's my answer:

    DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")
    
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")
    
    
    def MergeLineItems(rec):
        rec["lineItems1"] = {}
        a = []
        for x in rec["lineItems"]:
            val = x["M"]["expectedAmount"]["N"]
            x["M"]["expectedAmount"] = Decimal(val)
            
            val = x["M"]["notifiedAmount"]["N"]
            x["M"]["notifiedAmount"] = Decimal(val)
            
            val = x["M"]["grade"]["S"]
            x["M"]["grade"] = str(val)
            
            val = x["M"]["price"]["N"]
            x["M"]["price"] = Decimal(val)
            
            a.append(x["M"])
        rec["lineItems1"] = a
        return rec
      
    mapped_dyF =  Map.apply(frame = Transform0, f = MergeLineItems)
    mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
    mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems") 
    
    
    glueContext.write_dynamic_frame_from_options(
        frame=mapped_dyF,
        connection_type="dynamodb",
        connection_options={
            "dynamodb.region": "us-east-1",
            "dynamodb.output.tableName": "mydb",
            "dynamodb.throughput.write.percent": "1.0"
        }
    )
    job.commit()