Search code examples
amazon-web-servicesamazon-sqsaws-step-functions

SQS - Step function pipe failed


I am new to step functions and have a Pipe to poll SQS queue and initiate the step function execution per message received. The intention is to process one message at a time. However, the Step Function encounters an issue right at the start, accompanied by the following exception. I'm not sure why "ResultPath": "$.ExecutionDetails" doesn't work.

States.ReferencePathConflict
Unable to apply step "ExecutionDetails" to input [{"messageId":"xyz","receiptHandle":"abc","body":"{\"id\": 1, \"price\": 100}","attributes":{...}]

Input message:

{"id": 1, "price": 100}

In the following graph, each time a message is processed and deleted, GetQueueAttributes is executed to check ApproximateNumberOfMessages. If there is another message in the queue (ApproximateNumberOfMessages > 0), it goes back to the ReceiveMessage state.

enter image description here

State machine definition:

{
  "StartAt": "ListExecutions",
  "States": {
    "ListExecutions": {
      "Type": "Task",
      "Next": "Check Status",
      "Parameters": {
        "StateMachineArn.$": "$$.StateMachine.Id",
        "StatusFilter": "RUNNING"
      },
      "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions",
      "ResultSelector": {
        "StateMachineArn.$": "$$.StateMachine.Id",
        "uuid.$": "States.UUID()",
        "runningExecutionsCount.$": "States.ArrayLength($.Executions)"
      },
      "ResultPath": "$.ExecutionDetails"
    },
    "Check Status": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.ExecutionDetails.runningExecutionsCount",
          "NumericEquals": 1,
          "Next": "ReceiveMessage"
        }
      ],
      "Default": "Pass"
    },
    "ReceiveMessage": {
      "Type": "Task",
      "Next": "Dummy Processing",
      "Parameters": {
        "MaxNumberOfMessages": 1,
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/xxx/test-sqs.fifo"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
      "ResultSelector": {
        "body.$": "States.StringToJson($.Messages[0].Body)",
        "ReceiptHandle.$": "$.Messages[0].ReceiptHand"
      }
    },
    "Pass": {
      "Type": "Pass",
      "End": true
    },
    "Dummy Processing": {
      "Type": "Pass",
      "Next": "DeleteMessage"
    },
    "DeleteMessage": {
      "Type": "Task",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/xxx/test-sqs.fifo",
        "ReceiptHandle.$": "$.ReceiptHandle"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
      "Next": "GetQueueAttributes"
    },
    "GetQueueAttributes": {
      "Type": "Task",
      "Parameters": {
        "AttributeNames": [
          "ApproximateNumberOfMessages"
        ],
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/xxx/test-sqs.fifo"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:getQueueAttributes",
      "Next": "Choice"
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Attributes.ApproximateNumberOfMessages",
          "StringEquals": "0",
          "Next": "Done"
        }
      ],
      "Default": "ReceiveMessage"
    },
    "Done": {
      "Type": "Pass",
      "End": true
    }
  }
}

Solution

  • Your Step Function receives an array of SQS messages as input. You can see this in the error message. In the ResultPath key, therefore, $ refers to an array. The error message is telling you that "ResultPath": "$.ExecutionDetails" doesn't make sense, because $ is an array, not an object.

    You need to extract the message item from the array input. If you can count on always having one and only one message in the input, you can add the "States.ArrayGetItem($, 0)" intrinsic function in a Pass state's Parameters key to extract the sole message item. If your execution might receive more than one message, you'd need to add a Map state to process each item.