Search code examples
pythonamazon-web-servicesamazon-dynamodbaws-glue

How to write string set in dynamo with AWS Glue?


I need to copy data from one dynamo table to another and do some transformation along the way. For that, I exported data from the source table to s3 and ran crawler over it. In my Glue Job I'm using following code:

mapped = apply_mapping.ApplyMapping.apply(
    frame=source_df,
    mappings=[
        ("item.uuid.S", "string", "uuid", "string"),
        ("item.options.SS", "set", "options", "set"),
        ("item.updatedAt.S", "string", "updatedAt", "string"),
        ("item.createdAt.S", "string", "createdAt", "string")
    ],
    transformation_ctx='mapped'
)
df = mapped.toDF() //convert to spark df
// apply some transformation
target_df = DynamicFrame.fromDF(df, glue_context, 'target_df') //convert to dynamic frame
glue_context.write_dynamic_frame_from_options(
    frame=target_df,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.region": "eu-west-1",
        "dynamodb.output.tableName": "my-table",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

In the source dynamo table the options field is a String Set. In transformation, it remains untouched. However, in the target table is a list of strings:

"options": {
    "L": [
      {
        "S": "option A"
      },
      {
        "S": "option B"
      }
    ]
  }

Could anyone advise how to write a string set into DynamoDB using AWS Glue?


Solution

  • Unfortunately, I couldn't find a way to write string sets to DynamoDB using Glue interfaces. I've found some solutions using boto3 with Spark so here is my solution. I skipped the transformation part and simplified the example in general.

    # Load source data from catalog
    source_dyf = glue_context.create_dynamic_frame_from_catalog(
            GLUE_DB, GLUE_TABLE, transformation_ctx="source"
        )
    
    # Map dynamo attributes
    mapped_dyf = ApplyMapping.apply(
        frame=source_dyf,
        mappings=[
            ("item.uuid.S", "string", "uuid", "string"),
            ("item.options.SS", "set", "options", "set"),
            ("item.updatedAt.S", "string", "updatedAt", "string"),
            ("item.updatedAt.S", "string", "createdAt", "string")
        ],
        transformation_ctx='mapped'
    )
    
    
    def _putitem(items):
        resource = boto3.resource("dynamodb")
        table = resource.Table("new_table")
        with table.batch_writer() as batch_writer:
            for item in items:
                batch_writer.put_item(Item=item)
    
    
    df = mapped_dyf.toDF()
    # Apply spark transformations ...
    
    # save partitions to dynamo
    df.rdd.mapPartitions(_putitem).collect()
    

    Depends on your data volume you might want to increase the number of retries in boto3 or even change the mechanism. Also, you might want to play with DynamoDB Provisioning. I switched to on-demand to run this particular migration, but there is a catch