Search code examples
amazon-emraws-step-functions

Is there a way to get Step Functions input values into EMR step Args


We are running batch spark jobs using AWS EMR clusters. Those jobs run periodically and we would like to orchestrate those via AWS Step Functions.

As of November 2019 Step Functions has support for EMR natively. When adding a Step to the cluster we can use the following config:

"Some Step": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
    "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId",
        "Step": {
            "Name": "FirstStep",
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--class",
                    "com.some.package.Class",
                    "JarUri",
                    "--startDate",
                    "$.time",
                    "--daysToLookBack",
                    "$.daysToLookBack"
                ]
             }
         }
     },
     "Retry" : [
         {
             "ErrorEquals": [ "States.ALL" ],
             "IntervalSeconds": 1,
             "MaxAttempts": 1,
             "BackoffRate": 2.0
         }
     ],
     "ResultPath": "$.firstStep",
     "End": true
}

Within the Args List of the HadoopJarStep we would like to set arguments dynamically. e.g. if the input of the state machine execution is:

{
    "time": "2020-01-08",
    "daysToLookBack": 2
}

The strings in the config starting with "$." should be replaced accordingly when executing the State Machine, and the step on the EMR cluster should run command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate 2020-01-08 --daysToLookBack 2. But instead it runs command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate $.time --daysToLookBack $.daysToLookBack.

Does anyone know if there is a way to do this?


Solution

  • Parameters allow you to define key-value pairs, so as the value for the "Args" key is an array, you won't be able to dynamically reference a specific element in the array, you would need to reference the whole array instead. For example "Args.$": "$.Input.ArgsArray".

    So for your use-case the best way to achieve this would be to add a pre-processing state, before calling this state. In the pre-processing state you can either call a Lambda function and format your input/output through code or for something as simple as adding a dynamic value to an array you can use a Pass State to reformat the data and then inside your task State Parameters you can use JSONPath to get the array which you defined in in the pre-processor. Here's an example:

    {
    "Comment": "A Hello World example of the Amazon States Language using Pass states",
    "StartAt": "HardCodedInputs",
    "States": {
        "HardCodedInputs": {
            "Type": "Pass",
            "Parameters": {
                "cluster": {
                    "ClusterId": "ValueForClusterIdVariable"
                },
                "time": "ValueForTimeVariable",
                "daysToLookBack": "ValueFordaysToLookBackVariable"
            },
            "Next": "Pre-Process"
        },
        "Pre-Process": {
            "Type": "Pass",
            "Parameters": {
                "FormattedInputsForEmr": {
                    "ClusterId.$": "$.cluster.ClusterId",
                    "Args": [
                        {
                            "Arg1": "spark-submit"
                        },
                        {
                            "Arg2": "--class"
                        },
                        {
                            "Arg3": "com.some.package.Class"
                        },
                        {
                            "Arg4": "JarUri"
                        },
                        {
                            "Arg5": "--startDate"
                        },
                        {
                            "Arg6.$": "$.time"
                        },
                        {
                            "Arg7": "--daysToLookBack"
                        },
                        {
                            "Arg8.$": "$.daysToLookBack"
                        }
                    ]
                }
            },
            "Next": "Some Step"
        },
        "Some Step": {
            "Type": "Pass",
            "Parameters": {
                "ClusterId.$": "$.FormattedInputsForEmr.ClusterId",
                "Step": {
                    "Name": "FirstStep",
                    "ActionOnFailure": "CONTINUE",
                    "HadoopJarStep": {
                        "Jar": "command-runner.jar",
                        "Args.$": "$.FormattedInputsForEmr.Args[*][*]"
                    }
                }
            },
            "End": true
        }
      }
    }