I use AWS Step Functions
and have the following workflow
initStep - It's a lambda function handler, that gets some data and sends it to SQS
for external service.
activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')
def lambda_handler(event, context):
event['my_activity'] = activity
data = json.dumps(event)
# Retrieving a queue by its name
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue_name)
queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))
return event
validationWaiting - It's an activity
that waits for an answer from the external service that include the data.
complete - It's a lambda function handler, that uses the data from the initStep
.
def lambda_handler(event, context):
email = event['email'] if 'email' in event else None
data = event['data'] if 'data' in event else None
client = boto3.client(service_name='ses')
to = email.split(', ')
message_conrainer = {'Subject': {'Data': 'Email from step functions'},
'Body': {'Html': {
'Charset': "UTF-8",
'Data': """<html><body>
<p>""" + data """</p>
</body> </html> """
}}}
destination = {'ToAddresses': to,
'CcAddresses': [],
'BccAddresses': []}
return client.send_email(Source=from_addresses,
Destination=destination,
Message=message_container)
It does work, but the problem is that I'm sending full data from the initStep
to external service, just to pass it later to complete
. Potentially more steps can be added.
I believe it would be better to share it as some sort of global data (of current step function), that way I could add or remove steps and data would still be available for all.
Based on the answer of Marcin Sucharski I've came up with my own solution.
I needed to use Type: Task
since initStep
is a lambda, which sends SQS.
I didn't needed InputPath
in ValidationWaiting
, but only ResultPath
, which store the data received in activity.
I work with Serverless framework, here is my final solution:
StartAt: initStep
States:
initStep:
Type: Task
Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:init-step
Next: ValidationWaiting
ValidationWaiting:
Type: Task
ResultPath: $.validationOutput
Resource: arn:aws:states:#{AWS::Region}:#{AWS::AccountId}:activity:validationActivity
Next: Complete
Catch:
- ErrorEquals:
- States.ALL
ResultPath: $.validationOutput
Next: Complete
Complete:
Type: Task
Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:complete-step
End: true