Search code examples
pythonamazon-web-servicesaws-step-functions

How to share data in `AWS Step Functions` without passing it between the steps


I use AWS Step Functions and have the following workflow

AWS Step Functions workflow

initStep - It's a lambda function handler, that gets some data and sends it to SQS for external service.

activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')

def lambda_handler(event, context):
  event['my_activity'] = activity
  data = json.dumps(event)

  # Retrieving a queue by its name
  sqs = boto3.resource('sqs')
  queue = sqs.get_queue_by_name(QueueName=queue_name)

  queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))

  return event

validationWaiting - It's an activity that waits for an answer from the external service that include the data.

complete - It's a lambda function handler, that uses the data from the initStep.

def lambda_handler(event, context):
  email = event['email'] if 'email' in event else None
  data = event['data'] if 'data' in event else None

  client = boto3.client(service_name='ses')
  to = email.split(', ')
  message_conrainer = {'Subject': {'Data': 'Email from step functions'},
           'Body': {'Html': {
               'Charset': "UTF-8",
               'Data': """<html><body>
                            <p>""" + data """</p>
                            </body> </html> """
           }}}

  destination = {'ToAddresses': to,
               'CcAddresses': [],
               'BccAddresses': []}

  return client.send_email(Source=from_addresses,
                         Destination=destination,
                         Message=message_container)

It does work, but the problem is that I'm sending full data from the initStep to external service, just to pass it later to complete. Potentially more steps can be added.

I believe it would be better to share it as some sort of global data (of current step function), that way I could add or remove steps and data would still be available for all.


Solution

  • Based on the answer of Marcin Sucharski I've came up with my own solution.

    I needed to use Type: Task since initStep is a lambda, which sends SQS.

    I didn't needed InputPath in ValidationWaiting, but only ResultPath, which store the data received in activity.

    I work with Serverless framework, here is my final solution:

    StartAt: initStep
    States: 
      initStep:
        Type: Task
        Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:init-step
        Next: ValidationWaiting
      ValidationWaiting:
        Type: Task
        ResultPath: $.validationOutput
        Resource: arn:aws:states:#{AWS::Region}:#{AWS::AccountId}:activity:validationActivity
        Next: Complete
        Catch:
          - ErrorEquals:
            - States.ALL
          ResultPath: $.validationOutput
          Next: Complete
      Complete:
        Type: Task
        Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:complete-step
        End: true