Search code examples
amazon-web-servicesamazon-dynamodbamazon-data-pipeline

How to use Data Pipeline to copy data of a DynamoDB table to another DynamoDB table when both have on-demand capacity


I used to copy data from one DynamoDB to another DynamoDB using a pipeline.json. It works when the source table has provisioned capacity and doesn't matter if destination is set to provisioned/on demand. I want both of my tables set to On Demand capacity. But when i use the same template it doesn't work. Is there any way that we can do that, or is it still under development?

Here is my original functioning script:

{
    "objects": [
        {
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
            "name": "DailySchedule",
            "id": "DailySchedule",
            "period": "1 day",
            "type": "Schedule",
            "occurrences": "1"
        },
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "schedule": {
                "ref": "DailySchedule"
            },
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "id": "DDBSourceTable",
            "tableName": "#{myDDBSourceTableName}",
            "name": "DDBSourceTable",
            "type": "DynamoDBDataNode",
            "readThroughputPercent": "#{myDDBReadThroughputRatio}"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}"
        },
        {
            "id": "DDBDestinationTable",
            "tableName": "#{myDDBDestinationTableName}",
            "name": "DDBDestinationTable",
            "type": "DynamoDBDataNode",
            "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
        },
        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBSourceRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "EmrClusterForLoad",
            "name": "EmrClusterForLoad",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBDestinationRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableLoadActivity",
            "name": "TableLoadActivity",
            "runsOn": {
                "ref": "EmrClusterForLoad"
            },
            "input": {
                "ref": "S3TempLocation"
            },
            "output": {
                "ref": "DDBDestinationTable"
            },
            "type": "EmrActivity",
            "maximumRetries": "2",
            "dependsOn": {
               "ref": "TableBackupActivity"
            },
            "resizeClusterBeforeRunning": "true",
            "step": [
                "s3://dynamodb-emr-#{myDDBDestinationRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
            ]
        },
        {
            "id": "TableBackupActivity",
            "name": "TableBackupActivity",
            "input": {
                "ref": "DDBSourceTable"
            },
            "output": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "resizeClusterBeforeRunning": "true",
            "type": "EmrActivity",
            "maximumRetries": "2",
            "step": [
                "s3://dynamodb-emr-#{myDDBSourceRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
            ]
        },
        {
            "dependsOn": {
                "ref": "TableLoadActivity"
            },
            "name": "S3CleanupActivity",
            "id": "S3CleanupActivity",
            "input": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
               "ref": "EmrClusterForBackup"
            },
            "type": "ShellCommandActivity",
            "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBSourceTableName",
            "type": "String",
            "description": "Source DynamoDB table name"
        },
        {
            "id": "myDDBDestinationTableName",
            "type": "String",
            "description": "Target DynamoDB table name"
        },
        {
            "id": "myDDBWriteThroughputRatio",
            "type": "Double",
            "description": "DynamoDB write throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "id": "myDDBSourceRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBDestinationRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBReadThroughputRatio",
            "type": "Double",
            "description": "DynamoDB read throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

And here is the error message from Data Pipeline execution when source DynamoDB table is set to On Demand capacity:

at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:520)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:512)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:833)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.run(DynamoDbExport.java:79)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.main(DynamoDbExport.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)


Solution

  • The following JSON file worked for upload (DynamoDB to S3) -

    {
        "objects": [
            {
                "id": "Default",
                "name": "Default",
                "scheduleType": "ONDEMAND",
                "pipelineLogUri": "#{myS3LogsPath}",
                "failureAndRerunMode": "CASCADE",
                "role": "DataPipelineDefaultRole",
                "resourceRole": "DataPipelineDefaultResourceRole"
            },
            {
                "id": "DDBSourceTable",
                "tableName": "#{myDDBSourceTableName}",
                "name": "DDBSourceTable",
                "type": "DynamoDBDataNode",
                "readThroughputPercent": "#{myDDBReadThroughputRatio}"
            },
            {
                "name": "S3TempLocation",
                "id": "S3TempLocation",
                "type": "S3DataNode",
                "directoryPath": "#{myTempS3Folder}/data"
            },
            {
                "subnetId": "subnet-id",
                "id": "EmrClusterForBackup",
                "name": "EmrClusterForBackup",
                "masterInstanceType": "m5.xlarge",
                "coreInstanceType": "m5.xlarge",
                "coreInstanceCount": "1",
                "releaseLabel": "emr-5.23.0",
                "region": "#{myDDBSourceRegion}",
                "terminateAfter": "10 Days",
                "type": "EmrCluster"
            },
            {
                "id": "TableBackupActivity",
                "name": "TableBackupActivity",
                "input": {
                    "ref": "DDBSourceTable"
                },
                "output": {
                    "ref": "S3TempLocation"
                },
                "runsOn": {
                    "ref": "EmrClusterForBackup"
                },
                "resizeClusterBeforeRunning": "true",
                "type": "EmrActivity",
                "maximumRetries": "2",
                "step": [
                    "s3://dynamodb-dpl-#{myDDBSourceRegion}/emr-ddb-storage-handler/4.11.0/emr-dynamodb-tools-4.11.0-SNAPSHOT-jar-with-dependencies.jar,org.apache.hadoop.dynamodb.tools.DynamoDBExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
                ]
            }
        ],
        "parameters": [
            {
                "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
                "id" : "myS3LogsPath",
                "type" : "AWS::S3::ObjectKey",
                "description" : "S3 path for pipeline logs."
            },
            {
                "id": "myDDBSourceTableName",
                "type": "String",
                "description": "Source DynamoDB table name"
            },
            {
                "id": "myDDBSourceRegion",
                "type": "String",
                "description": "Region of the DynamoDB table",
                "default": "us-west-2"
            },
            {
                "id": "myDDBReadThroughputRatio",
                "type": "Double",
                "description": "DynamoDB read throughput ratio",
                "default": "1",
                "watermark": "Enter value between 0.1-1.0"
            },
            {
                "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
                "id": "myTempS3Folder",
                "type": "AWS::S3::ObjectKey",
                "description": "Temporary S3 folder"
            }
        ]
    }
    

    And the following worked for download (S3 to DynamoDB) -

    {
        "objects": [
            {
                "id": "Default",
                "name": "Default",
                "scheduleType": "ONDEMAND",
                "pipelineLogUri": "#{myS3LogsPath}",
                "failureAndRerunMode": "CASCADE",
                "role": "DataPipelineDefaultRole",
                "resourceRole": "DataPipelineDefaultResourceRole"
            },
            {
                "name": "S3TempLocation",
                "id": "S3TempLocation",
                "type": "S3DataNode",
                "directoryPath": "#{myTempS3Folder}/data"
            },
            {
                "id": "DDBDestinationTable",
                "tableName": "#{myDDBDestinationTableName}",
                "name": "DDBDestinationTable",
                "type": "DynamoDBDataNode",
                "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
            },
            {
                "subnetId": "subnet-id",
                "id": "EmrClusterForLoad",
                "name": "EmrClusterForLoad",
                "releaseLabel": "emr-5.23.0",
                "masterInstanceType": "m5.xlarge",
                "coreInstanceType": "m5.xlarge",
                "coreInstanceCount": "1",
                "region": "#{myDDBDestinationRegion}",
                "terminateAfter": "10 Days",
                "type": "EmrCluster"
            },
            {
                "id": "TableLoadActivity",
                "name": "TableLoadActivity",
                "runsOn": {
                    "ref": "EmrClusterForLoad"
                },
                "input": {
                    "ref": "S3TempLocation"
                },
                "output": {
                    "ref": "DDBDestinationTable"
                },
                "type": "EmrActivity",
                "maximumRetries": "2",
                "resizeClusterBeforeRunning": "true",
                "step": [
                    "s3://dynamodb-dpl-#{myDDBDestinationRegion}/emr-ddb-storage-handler/4.11.0/emr-dynamodb-tools-4.11.0-SNAPSHOT-jar-with-dependencies.jar,org.apache.hadoop.dynamodb.tools.DynamoDBImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
                ]
            },
            {
                "dependsOn": {
                    "ref": "TableLoadActivity"
                },
                "name": "S3CleanupActivity",
                "id": "S3CleanupActivity",
                "input": {
                    "ref": "S3TempLocation"
                },
                "runsOn": {
                   "ref": "EmrClusterForLoad"
                },
                "type": "ShellCommandActivity",
                "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
            }
        ],
        "parameters": [
            {
                "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
                "id" : "myS3LogsPath",
                "type" : "AWS::S3::ObjectKey",
                "description" : "S3 path for pipeline logs."
            },
            {
                "id": "myDDBDestinationTableName",
                "type": "String",
                "description": "Target DynamoDB table name"
            },
            {
                "id": "myDDBWriteThroughputRatio",
                "type": "Double",
                "description": "DynamoDB write throughput ratio",
                "default": "1",
                "watermark": "Enter value between 0.1-1.0"
            },
            {
                "id": "myDDBDestinationRegion",
                "type": "String",
                "description": "Region of the DynamoDB table",
                "default": "us-west-2"
            },
            {
                "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
                "id": "myTempS3Folder",
                "type": "AWS::S3::ObjectKey",
                "description": "Temporary S3 folder"
            }
        ]
    }
    

    Also, the subnet ID fields in both the pipeline definitions are totally optional, but it is always good to set them.