Search code examples
pythonazure-functionsaws-step-functions

boto3.client('stepfunctions').get_activity_task() hangs


I have a project to implement where I need to communicate with a Step Function that another department created. I'm new to Step Functions so please bear with me if I'm missing anything.

We have a UI where a user can request their data or have their data deleted. This request is sent to an API Gateway and then to a Step Function which creates multiple workers/subscriptions. My task is to create an Azure Function (using Python) to process the tasks and link to the relevant places where we have data and either delete it or return it to an S3 bucket. I thus have the below script:

import datetime
import logging
import boto3
import os
import json

workerName = creds['workerName']
region_name = creds['region_name']
activityArn = creds['activityArn']
aws_access_key_id = creds['aws_access_key_id']
aws_secret_access_key = creds['aws_secret_access_key']
bucket = creds['bucket']

sfn_client = boto3.client(
    service_name='stepfunctions',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

activity = sfn_client.get_activity_task(
    activityArn = activityArn,
    workerName = workerName
)

task_token, task = activity['taskToken'], json.loads(activity['input'])
# TODO Process Task

I notice that every time I run activity = ... I get a new task instead of a list and have read through the documentation that I need to use the send_task_failure(), send_task_heartbeat(), and send_task_success() methods which is fine. Since it returns one activity per run I was planning to run a loop until I have no more activities but when I get to the end (or when there are no activities to run) the script just hangs until timeout.

Is there a way to get a count of unstarted activities only so I can use that to loop through or is there a better approach to this?


Solution

  • Ok so after reading through the documentation I found that I had to add a read_timeout > than the default...I think the default is 60s so I added a 65s timeout as per below

    import datetime
    import logging
    import boto3
    from botocore.client import Config
    import os
    import json
    
    connect_timeout = creds['connect_timeout'] + 5
    read_timeout = creds['read_timeout'] + 5
    workerName = creds['workerName']
    region_name = creds['region_name']
    activityArn = creds['activityArn']
    aws_access_key_id = creds['aws_access_key_id']
    aws_secret_access_key = creds['aws_secret_access_key']
    bucket = creds['bucket']
    cfg = creds['cfg']
    
    config = Config(
        connect_timeout=connect_timeout,
        read_timeout=read_timeout,
        retries={'max_attempts': 0}
    )
    
    sfn_client = boto3.client(
        service_name='stepfunctions',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        region_name=region_name,
        config=config
    )
    
    while True:
        activity_task = sfn_client.get_activity_task(
            activityArn = activityArn,
            workerName = workerName
        )
        
        if 'input' not in activity_task.keys() or 'taskToken' not in activity_task.keys():
            print(f"No more activity tasks")
            break
    
        taskToken, task = activity_task['taskToken'], json.loads(activity_task['input'])
    

    On the final pass through it returns a JSON with the same keys as all other activities but without the input and taskToken