Search code examples
python-3.xazure-devopsdevopsaws-step-functions

botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) The security token included in the request is expired


I have a token expired issue. At the moment, it is expiring at 60 minutes. The problem with this issue is that this step function would run more than 17 hours and so I need to be able to catch exception for this session or re-assume role the role without breaking or stopping the step function execution in the python. The policy in place could not be changed and I need a workaround on this. Any pointer to use aws secret manager to cache it and use it inside the python script.

Error :

> Status...: RUNNING
> Status...: RUNNING
Traceback (most recent call last):
    sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
  File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 401, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 731, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the DescribeExecution operation: The security token included in the request is expired
##[error]The process '/opt/hostedtoolcache/Python/3.8.12/x64/bin/python' failed with exit code  1

Python code:

import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import json
from botocore.exceptions import ClientError


sts_client = boto3.client('sts')

   
                                                        
session = boto3.Session()
session.get_credentials().secret_key
print(session.get_credentials().secret_key)

sf_client = boto3.client('stepfunctions', region_name="us-west-2")
sf_output = sf_client.start_execution (
    stateMachineArn = 'arn:aws:states:us-west-2:xxxxx:stateMachine:PredictiveAnalyticsPipelineOrchestration-xxxxxxxx',  
   #input = json.dumps({})  # this is for all 
   #input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
    input ='{ \"basin_list\": [\"RIpe\"],\"db\": \"RAMAN\",\"pipelinePhases\": \"lifestage,monthly_production,tiers,sequence,quintiles\"}'
)

while True:
    time.sleep(10)   # don't need to check every nanosecond

    sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
    step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE

    print("%s: %s" % ("> Status...", step_status))

    if step_status == 'RUNNING':
        continue
    elif step_status == 'FAILED':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        print(sf_response)
        #raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
        break
    elif step_status == 'TIMED_OUT':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        
        break
    elif step_status == 'ABORTED':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        
        break
    else: # SUCCEEDED
        step_status == 'SUCCEEDED'
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        print(sf_response)
        break

Pipeline code :

jobs:
  - job: determine_the_stepfunction_status
    timeoutInMinutes: 5000
    cancelTimeoutInMinutes: 3
    steps:

      - task: AWSAssumeRole@1
        displayName: 'Login to AWS'
        inputs:
          RoleArn: 'arn:aws:iam::$(AWS_ACCOUNT_ID):role/Energyxxxxxx-xxxxx-Role'
          SessionName: 'Energyxxxxxx-xxxxx-Role'
          ConfigureAWSCLIEnvironmentVariables: true
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '3.8'
          #addToPath: true
          #architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
      - script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
        displayName: 'Install python tools'
     
      - task: PythonScript@0
        env:
           STEP_STATUS: $(step_status)
           AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
           AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
           AWS_SESSION_TOKEN: $(AWS.SessionToken)
                    
        inputs:
          scriptSource: 'filePath' # Options: filePath, inline
          scriptPath: 'step_function.py'
          
          failOnStderr: false # Optional

Solution

  • This has been resolved now using python to refresh the session every 45 minutes. The python is adjusted as :

    import os
    import logging
    import snowflake.connector
    from argparse import ArgumentParser
    from datetime import datetime
    from typing import Tuple
    import time
    from time import sleep
    import boto3
    import botocore
    import json
    import base64
    import botocore.session
    from botocore.credentials import AssumeRoleCredentialFetcher, DeferredRefreshableCredentials
    from botocore.exceptions import ClientError
    from botocore.session import get_session
    
    accounts = [
        {"name": "Prod", "id": "youraccountid"},
        #{"name": "Account2", "id": "xxxxxxxxxxx"}  # you can add this if you have multiple account
    ]
    
    regions = [ "us-west-2" ]
    
    # Replace myRole with your local named profile
    boto3.setup_default_session()
    
    # 3600 seconds in an hour, this value should match your role's but am using 45 minutes refresh
    # maximum session duration (AWS default is 1 hour). If you're
    # role chaining (e.g. saml2aws) 1 hour is a hard limit.
    def refresh_external_credentials():
        # Assume role, get details
        client = boto3.client('sts')
        credentials = client.assume_role(
            RoleArn='arn:aws:iam::youraccountid:role/Energyxxxxxx-xxxxx-Role',
            RoleSessionName="Energyxxxxxx-xxxxx-Role", # this name does not matter
            DurationSeconds=3000
        ).get("Credentials")
        return {
            "access_key": credentials.get('AccessKeyId'),
            "secret_key": credentials.get('SecretAccessKey'),
            "token": credentials.get('SessionToken'),
            "expiry_time": credentials.get('Expiration').isoformat()
        }
    
    roleArn = ''
    
    for account in accounts:
        id = account.get('id')
        accountName = account.get('name')
        
        # Replace roleToAssume with your target role
        roleArn = 'arn:aws:iam::' + str(id) + ':role/Energyxxxxxx-xxxxx-Role'
        
        credentials = botocore.credentials.RefreshableCredentials.create_from_metadata(
            metadata=refresh_external_credentials(),
            refresh_using=refresh_external_credentials,
            method="sts-assume-role",
        )
        
        for region in regions:
            session = get_session()
            session._credentials = credentials
            session.set_config_variable("region", region)
            autorefresh_session = boto3.session.Session(botocore_session=session)
            
            # Your boto3 calls, for example...
            #rds = autorefresh_session.client('rds')
            #databases = rds.describe_db_instances()
            sf_client = autorefresh_session.client('stepfunctions')
    sf_output = sf_client.start_execution (
        stateMachineArn = 'arn:aws:states:us-west-2:youraccountid:stateMachine:name_of_your_state_machine',  
       #input = json.dumps({})  # this is for all 
       #input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
        input ='{ \"basin_list\": [\"POwer\",\"BOW\"],\"db_postfix\": \"schemaname\",\"pipe\": \"etl,frac,ecline,capex,breakeven,lifestage\"}'
    )
    
    
    
    while True:
        time.sleep(10)   # don't need to check every nanosecond
    
        sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
        step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE
    
        print("%s: %s" % ("> Status...", step_status))
    
        if step_status == 'RUNNING':
            continue
        elif step_status == 'FAILED':
            print(step_status)
            print (f'##vso[task.setvariable variable=step_status]{step_status}')
            print(sf_response)
            #raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
            break
        elif step_status == 'TIMED_OUT':
            print(step_status)
            print (f'##vso[task.setvariable variable=step_status]{step_status}')
            
            break
        elif step_status == 'ABORTED':
            print(step_status)
            print (f'##vso[task.setvariable variable=step_status]{step_status}')
            
            break
        else: # SUCCEEDED
            step_status == 'SUCCEEDED'
            print(step_status)
            print (f'##vso[task.setvariable variable=step_status]{step_status}')
            print(sf_response)
            break
    

    Regarding the pipeline the assumed role is not needed to avoid conflict with the other on inside the python script

    jobs:
      - job: determine_the_stepfunction_status
        timeoutInMinutes: 5000
        cancelTimeoutInMinutes: 3
        steps:
    
          - task: UsePythonVersion@0
            inputs:
              versionSpec: '3.8'
              #addToPath: true
              #architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
          - script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
            displayName: 'Install python tools'
         
          - task: PythonScript@0
            env:
               STEP_STATUS: $(step_status)
               AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
               AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
               AWS_SESSION_TOKEN: $(AWS.SessionToken)
                        
            inputs:
              scriptSource: 'filePath' # Options: filePath, inline
              scriptPath: 'step_function.py'
              
              failOnStderr: false # Optional