Search code examples
c#amazon-web-servicesaws-lambdaaws-step-functions

AWS Unreachable state when using a custom state


I am new to using step functions, and I am having a hard time figuring out why my state is considered unreachable. I assume this is the case given the error given in CloudFormation when my pipeline is ran for my state machine

Resource handler returned message: "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: Expected value of type [ARRAY] at /States/ProcessFile/Catch, MISSING_TRANSITION_TARGET: State "TerminalState" is not reachable. at /States/ProcessFile/ItemProcessor/States/TerminalState' (Service: AWSStepFunctions; Status Code: 400; Error Code: InvalidDefinition; Request ID: cdbda38e-12d5-4dee-81a3-020a2a00cd32; Proxy: null)" (RequestToken: 81bdef9c-408a-d6b2-af56-e1f42eb54c50, HandlerErrorCode: InvalidRequest)

I define my state machine as follows, because this is a custom state, I had to manually roll a lot of the definitions as is shown in AWS's documentation for custom state using C#.

IDictionary<string, object> stateJson = new Dictionary<string, object> {
                { "Type", "Map" },
                { "ItemProcessor", new Dictionary<string, object> {
                    { "ProcessorConfig", new Dictionary<string, object> {
                        { "Mode", "DISTRIBUTED" },
                        { "ExecutionType", "EXPRESS" }
                    } },
                    { "StartAt", "AudienceListUserValidation" },
                    { "States", new Dictionary<string, object> {
                        { "AudienceListUserValidation", new Dictionary<string, object> 
                            {
                                { "Type", "Task" },
                                { "Resource", "arn:aws:states:::lambda:invoke" },
                                { "Parameters", new Dictionary<string, object> {
                                    { "FunctionName", userValidationfunction.Function.FunctionName },
                                    { "Payload.$", "$" }
                                } },
                                { "ResultPath",  JsonPath.DISCARD },
                                { "Catch", catchSerialized },
                                { "End", true },
                            } 
                        },
                        { "TerminalState", new Dictionary<string, object>
                            {
                                { "Type", "Task"},
                                { "Resource", "arn:aws:states:::lambda:invoke"},
                                { "Parameters", new Dictionary<string, object>
                                    {
                                        { "FunctionName", errorFunction.Function.FunctionName },
                                        { "Payload.$", "$" }
                                    }
                                },
                                { "End", true }
                            }
                        }
                    } }
                } },
                { "Label", "ProcessFile" },
                { "MaxConcurrency", 5 },
                { "ItemReader", new Dictionary<string, object> 
                    {
                        { "Resource", "arn:aws:states:::s3:getObject" },
                        { "ReaderConfig", new Dictionary<string, object> 
                            {
                                { "InputType", "CSV" },
                                { "CSVHeaderLocation", "FIRST_ROW" }
                            } 
                        },
                        { "Parameters", new Dictionary<string, object> 
                            {
                                { "Bucket.$", "$.S3BucketName" },
                                { "Key.$", "$.S3FileName" }
                            } 
                        }
                    } 
                },
                { "ItemSelector", new Dictionary<string, object>
                    {
                        { "MatchingFieldId.$", "$$.Map.Item.Value.Email"},
                        { "FileUploadId.$", "$.FileUploadId"}
                    }
                },
                { "ResultPath",  JsonPath.DISCARD } 
            };

Whereby catchSerialized is defined as such

var catchSerialized = JsonConvert.SerializeObject(new[] { 
                new { 
                    ErrorEquals = new[] { "States.ALL" }, 
                    Next = "TerminalState" 
                } 
            });

This was done because Catch excepts an array and it would've been an absolute nightmare to try and configure it otherwise. As far as I am aware, by having a StartAt it would tell the state machine that we must first execute the AudienceListUserValidation step, then if it fails for whatever reason it should hit my Terminal state which invokes my specific lambda for handling the error. I looked at this github post (https://github.com/aws/aws-toolkit-vscode/issues/1117) and it appears that order is not my issue given issue. My set up seems to be correct when I look at this documentation as well https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-handling-error-conditions.html

Where have I gone awry? If it helps I am using the CDK, given I am using the CDK do I even need to define that error state in my catch and could I just rely on the OnFailure property instead? Ex:

audienceListUserValidationFuncProps.OnFailure = new LambdaDestination(errorFunction.Function, new LambdaDestinationOptions
            {
                ResponseOnly = true,
            });

Solution

  • I'm not 100% sure why, but don't think your catchSerialized is being generated properly. I created the following ASL to model what you were doing and it saves fine.

    {
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "ItemProcessor": {
            "ProcessorConfig": {
              "Mode": "DISTRIBUTED",
              "ExecutionType": "EXPRESS"
            },
            "StartAt": "AudienceListUserValidation",
            "States": {
              "AudienceListUserValidation": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": "arn:aws:lambda:123456789012:function:userValidationFunction"
                },
                "End": true,
                "ResultPath": null,
                "Catch": [
                  {
                    "ErrorEquals": [
                      "States.ALL"
                    ],
                    "Next": "TerminalState"
                  }
                ]
              },
              "TerminalState": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": "arn:aws:lambda:123456789012:function:errorFunction"
                },
                "End": true
              }
            }
          },
          "End": true,
          "MaxConcurrency": 5,
          "Label": "ProcessFile",
          "ItemReader": {
            "Resource": "arn:aws:states:::s3:getObject",
            "ReaderConfig": {
              "InputType": "CSV",
              "CSVHeaderLocation": "FIRST_ROW"
            },
            "Parameters": {
              "Bucket.$": "$.S3BucketName",
              "Key.$": "$.S3FileName"
            }
          },
          "ItemSelector": {
            "MatchingFieldId.$": "$$.Map.Item.Value.Email",
            "FileUploadId.$": "$.FileUploadId"
          },
          "ResultPath": null
        }
      }
    }
    

    But if I remove the catcher object from the Catch array, as in the example below, I get the error you are reporting.

    enter image description here

    {
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "ItemProcessor": {
            "ProcessorConfig": {
              "Mode": "DISTRIBUTED",
              "ExecutionType": "EXPRESS"
            },
            "StartAt": "AudienceListUserValidation",
            "States": {
              "AudienceListUserValidation": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": "arn:aws:lambda:123456789012:function:userValidationFunction"
                },
                "End": true,
                "ResultPath": null,
                "Catch": [
                  
                ]
              },
              "TerminalState": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": "arn:aws:lambda:123456789012:function:errorFunction"
                },
                "End": true
              }
            }
          },
          "End": true,
          "MaxConcurrency": 5,
          "Label": "ProcessFile",
          "ItemReader": {
            "Resource": "arn:aws:states:::s3:getObject",
            "ReaderConfig": {
              "InputType": "CSV",
              "CSVHeaderLocation": "FIRST_ROW"
            },
            "Parameters": {
              "Bucket.$": "$.S3BucketName",
              "Key.$": "$.S3FileName"
            }
          },
          "ItemSelector": {
            "MatchingFieldId.$": "$$.Map.Item.Value.Email",
            "FileUploadId.$": "$.FileUploadId"
          },
          "ResultPath": null
        }
      }
    }