I created a step function for creating EMR cluster I want that the date in the steps to change according to the date I execute my step function. (if I run it today - 13.6.2023, I want it to run day before 12.6.2023) how can I do it? This is my code:
{
"Comment": "A description of my state machine",
"StartAt": "EMR CreateCluster",
"States": {
"EMR CreateCluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "IOretrieve",
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"ReleaseLabel": "emr-6.8.0",
"Applications": [
{
"Name": "Spark"
}
],
"LogUri": "s3://",
"VisibleToAllUsers": true,
"Instances": {
"Ec2SubnetId": "subnet",
"Ec2KeyName": "",
"EmrManagedMasterSecurityGroup": "",
"EmrManagedSlaveSecurityGroup": "",
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"Name": "Master",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"Name": "CORE",
"TargetOnDemandCapacity": 5,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
]
},
"BootstrapActions": [
{
"Name": "Custom action",
"ScriptBootstrapAction": {
"Path": "s3://",
"Args": []
}
}
],
"Configurations": [
{
"Classification": "core-site",
"Properties": {
"fs.s3a.connection.maximum": "1000"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]
},
"ResultPath": "$.cluster",
"Next": "Run first step"
},
"Run first step": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "My first EMR step",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"s3://",
"--local_run",
"False",
"--date_path",
"year=2023/month=06/day=12/"
]
}
}
},
"ResultPath": "$.firstStep",
"Next": "Run second step"
},
"Run second step": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "My second EMR step",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"s3://",
"--local_run",
"False",
"--date_path",
"year=2023/month=06/day=12/"
]
}
}
},
"ResultPath": "$.secondStep",
"Next": "EMR TerminateCluster"
},
"EMR TerminateCluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster",
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId"
},
"End": true
}
}
}
The date path is what I want to change: "--date_path", "year=2023/month=06/day=12/"
AWS Step Functions provides some simple Intrinsic Functions for math operations, like States.MathRandom
and States.MathAdd
.
However, at the time of writing (June 2023), more complex calculations, like getting the date of the previous day can't be done out-of-the-box and require invoking an external process, i.e. a Lambda function.
Having that said, you can retrieve and format the current date and time following the steps below.
Step 1:
Retrieve the execution time of the specific step from the Context Object using
$$.State.EnteredTime
This will return the date and time using the following format:
2019-03-26T20:14:13.192Z
Step 2:
Split the execution time into an array using States.StringSplit
:
States.StringSplit($$.State.EnteredTime, '-,T')
This will return the following Array:
[
"2019",
"03",
"26",
"20:14:13.192Z"
]`
Step 3:
Format the date path string using States.Format
with the first three elements of the array:
States.Format('year={}/month={}/day={}', States.ArrayGetItem($.date.splitDate, 0), States.ArrayGetItem($.date.splitDate, 1), States.ArrayGetItem($.date.splitDate, 2))
Step 4:
Create the Args array using States.Array
:
States.Array('spark-submit', '--deploy-mode', 'client', 's3://', '--local_run', 'False', '--date_path',$.datePath)
To show you how this works in the context of your state machine, I've 1/ added an additional pass state called "Format date path": {...}
to your state machine, and replaced the HadoopJarStep.Args
attribute in your task state ("Run second step": {...}
):
{
"Comment": "A description of my state machine",
"StartAt": "EMR CreateCluster",
"States": {
"EMR CreateCluster": {
...,
"Next": "Format date path"
},
"Format date path": {
"Type": "Pass",
"Parameters": {
"datePath.$": "States.Format('year={}/month={}/day={}', States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, '-,T'), 0), States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, '-,T'), 1), States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, '-,T'), 2))"
},
"Next": "Run second step"
},
"Run second step": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "My second EMR step",
"ClusterId.$": "$.cluster.ClusterId",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args.$": "States.Array('spark-submit', '--deploy-mode', 'client', 's3://', '--local_run', 'False', '--date_path',$.datePath)"
}
}
},
"ResultPath": "$.secondStep",
"Next": "EMR TerminateCluster"
},
"EMR TerminateCluster": {
...
}
}
}