Search code examples
amazon-web-servicesamazon-emraws-step-functions

Using date variable in a step function AWS


I created a step function for creating EMR cluster I want that the date in the steps to change according to the date I execute my step function. (if I run it today - 13.6.2023, I want it to run day before 12.6.2023) how can I do it? This is my code:

{
  "Comment": "A description of my state machine",
  "StartAt": "EMR CreateCluster",
  "States": {
    "EMR CreateCluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
      "Parameters": {
        "Name": "IOretrieve",
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "ReleaseLabel": "emr-6.8.0",
        "Applications": [
          {
            "Name": "Spark"
          }
        ],
        "LogUri": "s3://",
        "VisibleToAllUsers": true,
        "Instances": {
          "Ec2SubnetId": "subnet",
          "Ec2KeyName": "",
          "EmrManagedMasterSecurityGroup": "",
           "EmrManagedSlaveSecurityGroup": "",
          "KeepJobFlowAliveWhenNoSteps": true,
          "InstanceFleets": [
            {
              "InstanceFleetType": "MASTER",
              "Name": "Master",
              "TargetOnDemandCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m5.xlarge"
                }
              ]
            },
            {
              "InstanceFleetType": "CORE",
              "Name": "CORE",
              "TargetOnDemandCapacity": 5,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "r5.2xlarge"
                }
              ]
            }
          ]
        },
        "BootstrapActions": [
            {
              "Name": "Custom action",
              "ScriptBootstrapAction": {
                "Path": "s3://",
                "Args": []
              }
            }
          ],
        "Configurations": [
            {
                "Classification": "core-site",
                "Properties": {
                    "fs.s3a.connection.maximum": "1000"
                }
            },
            {
                "Classification": "spark",
                "Properties": {
                    "maximizeResourceAllocation": "true"
                }
            }
        ]
      },
      "ResultPath": "$.cluster",
      "Next": "Run first step"
    },
    "Run first step": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
      "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId",
        "Step": {
          "Name": "My first EMR step",
          "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
              "spark-submit",
              "--deploy-mode",
              "client",
              "s3://",
              "--local_run",
              "False",
              "--date_path",
              "year=2023/month=06/day=12/"
            ]
          }
        }
      },
      "ResultPath": "$.firstStep",
      "Next": "Run second step"
    },
    "Run second step": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
      "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId",
        "Step": {
          "Name": "My second EMR step",
          "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
              "spark-submit",
              "--deploy-mode",
              "client",
              "s3://",
              "--local_run",
              "False",
              "--date_path",
              "year=2023/month=06/day=12/"
            ]
          }
        }
      },
      "ResultPath": "$.secondStep",
      "Next": "EMR TerminateCluster"
    },
    "EMR TerminateCluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster",
      "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId"
      },
      "End": true
    }
  }
}

The date path is what I want to change: "--date_path", "year=2023/month=06/day=12/"


Solution

  • AWS Step Functions provides some simple Intrinsic Functions for math operations, like States.MathRandom and States.MathAdd.

    However, at the time of writing (June 2023), more complex calculations, like getting the date of the previous day can't be done out-of-the-box and require invoking an external process, i.e. a Lambda function.


    Having that said, you can retrieve and format the current date and time following the steps below.

    Step 1:

    Retrieve the execution time of the specific step from the Context Object using

    $$.State.EnteredTime
    

    This will return the date and time using the following format:

    2019-03-26T20:14:13.192Z
    

    Step 2:

    Split the execution time into an array using States.StringSplit:

    States.StringSplit($$.State.EnteredTime, '-,T')
    

    This will return the following Array:

    [
      "2019",
      "03",
      "26",
      "20:14:13.192Z"
    ]`
    

    Step 3:

    Format the date path string using States.Format with the first three elements of the array:

    States.Format('year={}/month={}/day={}', States.ArrayGetItem($.date.splitDate, 0), States.ArrayGetItem($.date.splitDate, 1), States.ArrayGetItem($.date.splitDate, 2))
    

    Step 4:

    Create the Args array using States.Array:

    States.Array('spark-submit', '--deploy-mode', 'client', 's3://', '--local_run', 'False', '--date_path',$.datePath)
    

    To show you how this works in the context of your state machine, I've 1/ added an additional pass state called "Format date path": {...} to your state machine, and replaced the HadoopJarStep.Args attribute in your task state ("Run second step": {...}):

    {
      "Comment": "A description of my state machine",
      "StartAt": "EMR CreateCluster",
      "States": {
        "EMR CreateCluster": {
          ...,
          "Next": "Format date path"
        },
        "Format date path": {
          "Type": "Pass",
          "Parameters": {
            "datePath.$": "States.Format('year={}/month={}/day={}', States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, '-,T'), 0), States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, '-,T'), 1), States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, '-,T'), 2))"
          },
          "Next": "Run second step"
        },
        "Run second step": {
          "Type": "Task",
          "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
          "Parameters": {
            "ClusterId.$": "$.cluster.ClusterId",
            "Step": {
              "Name": "My second EMR step",
              "ClusterId.$": "$.cluster.ClusterId",
              "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args.$": "States.Array('spark-submit', '--deploy-mode', 'client', 's3://', '--local_run', 'False', '--date_path',$.datePath)"
              }
            }
          },
          "ResultPath": "$.secondStep",
          "Next": "EMR TerminateCluster"
        },
        "EMR TerminateCluster": {
          ...
        }
      }
    }