Search code examples
amazon-web-serviceswebsocketaws-api-gatewayaws-signature

Invoke Api Gateway Websockets API through HTTP


AWS provides us an SDK way to interact with WebSocket APIs, post_to_connection, delete_connection, get_connection

But how can we invoke the APIs from HTTP, which means a kind of python requests thing?

What I tried?

Here I'm struggling to find an example for how to sign a request for an API Gateway service.


Solution

  • Here is the code to invoke web socket APIs using python-requests:

    In the below code, replace aws_region, and api_id with your data.

    You can use get_connection, post_connection, delete_connection to get connection details, post msg to connection, and delete connection.

    #!/usr/bin/python
    import os
    from time import sleep
    
    import requests
    import json
    from urllib.parse import quote, urlparse
    import hmac
    import hashlib
    from datetime import datetime
    
    
    def get_canonical_path(url):
        """
        Create canonical URI--the part of the URI from domain to query
        string (use '/' if no path)
        """
        parsed_url = urlparse(url)
    
        # safe chars adapted from boto's use of urllib.parse.quote
        # https://github.com/boto/boto/blob/d9e5cfe900e1a58717e393c76a6e3580305f217a/boto/auth.py#L393
        return quote(parsed_url.path if parsed_url.path else '/', safe='/-_.~')
    
    
    def get_canonical_querystring(url):
        """
        Create the canonical query string. According to AWS, by the
        end of this function our query string values must
        be URL-encoded (space=%20) and the parameters must be sorted
        by name.
        This method assumes that the query params in `r` are *already*
        url encoded.  If they are not url encoded by the time they make
        it to this function, AWS may complain that the signature for your
        request is incorrect.
        It appears elasticsearc-py url encodes query paramaters on its own:
            https://github.com/elastic/elasticsearch-py/blob/5dfd6985e5d32ea353d2b37d01c2521b2089ac2b/elasticsearch/connection/http_requests.py#L64
        If you are using a different client than elasticsearch-py, it
        will be your responsibility to urleconde your query params before
        this method is called.
        """
        canonical_querystring = ''
    
        parsedurl = urlparse(url)
        querystring_sorted = '&'.join(sorted(parsedurl.query.split('&')))
    
        for query_param in querystring_sorted.split('&'):
            key_val_split = query_param.split('=', 1)
    
            key = key_val_split[0]
            if len(key_val_split) > 1:
                val = key_val_split[1]
            else:
                val = ''
    
            if key:
                if canonical_querystring:
                    canonical_querystring += "&"
                canonical_querystring += u'='.join([key, val])
    
        return canonical_querystring
    
    
    def sign(key, msg):
        """
        Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
        """
        return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
    
    
    def get_signature_key(secret_key, date_stamp, region_name, service_name):
        """
        Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
        """
        kDate = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
        kRegion = sign(kDate, region_name)
        kService = sign(kRegion, service_name)
        kSigning = sign(kService, 'aws4_request')
        return kSigning
    
    
    def get_headers(aws_host: str, url: str, request_body: str, aws_region: str, service: str, aws_access_key: str,
                    aws_secret_access_key: str, method: str):
        method = method.upper()
        # Create a date for headers and the credential string
        t = datetime.utcnow()
        amzdate = t.strftime('%Y%m%dT%H%M%SZ')
    
        date_stamp = t.strftime('%Y%m%d')  # Date w/o time for credential_scope
    
        canonical_uri = get_canonical_path(url)
    
        canonical_querystring = get_canonical_querystring(url)
    
        # Create the canonical headers and signed headers. Header names
        # and value must be trimmed and lowercase, and sorted in ASCII order.
        # Note that there is a trailing \n.
        canonical_headers = ('host:' + aws_host + '\n' +
                             'x-amz-date:' + amzdate + '\n')
    
        # Create the list of signed headers. This lists the headers
        # in the canonical_headers list, delimited with ";" and in alpha order.
        # Note: The request can include any headers; canonical_headers and
        # signed_headers lists those that you want to be included in the
        # hash of the request. "Host" and "x-amz-date" are always required.
        signed_headers = 'host;x-amz-date'
    
        payload_hash = hashlib.sha256(request_body).hexdigest()
    
        # Combine elements to create create canonical request
        canonical_request = (method + '\n' + canonical_uri + '\n' +
                             canonical_querystring + '\n' + canonical_headers +
                             '\n' + signed_headers + '\n' + payload_hash)
    
        # Match the algorithm to the hashing algorithm you use, either SHA-1 or
        # SHA-256 (recommended)
        algorithm = 'AWS4-HMAC-SHA256'
        credential_scope = (date_stamp + '/' + aws_region + '/' +
                            service + '/' + 'aws4_request')
        string_to_sign = (algorithm + '\n' + amzdate + '\n' + credential_scope +
                          '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())
    
        # Create the signing key using the function defined above.
        signing_key = get_signature_key(secret_key=aws_secret_access_key, date_stamp=date_stamp, region_name=aws_region,
                                        service_name=service)
    
        # Sign the string_to_sign using the signing_key
        string_to_sign_utf8 = string_to_sign.encode('utf-8')
        signature = hmac.new(signing_key,
                             string_to_sign_utf8,
                             hashlib.sha256).hexdigest()
    
        # The signing information can be either in a query string value or in
        # a header named Authorization. This code shows how to use a header.
        # Create authorization header and add to request headers
        authorization_header = (algorithm + ' ' + 'Credential=' + aws_access_key +
                                '/' + credential_scope + ', ' + 'SignedHeaders=' +
                                signed_headers + ', ' + 'Signature=' + signature)
    
        headers = {
            'Authorization': authorization_header,
            'x-amz-date': amzdate,
            'x-amz-content-sha256': payload_hash,
            'Content-Type': 'application/json'
        }
    
        return headers
    
    
    aws_host = '<api_id>.execute-api.<aws_region>.amazonaws.com'
    
    connections_base_url = 'https://<api_id>.execute-api.<aws_region>.amazonaws.com/<api_stage>/@connections'
    
    body = "Hey everyone From HTTP"
    
    body = body.encode('utf-8')
    
    aws_region = '<aws_region>'
    
    service = 'execute-api'
    
    aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
    
    aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY_ID")
    
    def delete_connection(connection_id):
        url = f'{connections_base_url}/{connection_id}'
    
        headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
                              aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
                              method="delete")
    
        # print(headers)
    
        response = requests.delete(url=url, headers=headers, data=body)
    
        print(f"Deleted {connection_id} connection: {response.text}", response.status_code)
    
    
    def post_message(connection_id):
        url = f'{connections_base_url}/{connection_id}'
    
        headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
                              aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
                              method="post")
    
        # print(headers)
    
        response = requests.post(url=url, headers=headers, data=body)
    
        print(f"Reponse for send msg to {connection_id}: {response.text}", response.status_code)
    
    
    def get_connection(connection_id):
        url = f'{connections_base_url}/{connection_id}'
    
        headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
                              aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
                              method="get")
    
        # print(headers)
    
        response = requests.get(url=url, headers=headers, data=body)
    
        print(f"Status: {response.text}", response.status_code)
    
    
    # Sample
    connection_id = "connection_id"
    
    get_connection(connection_id)
    
    post_message(connection_id="connection_id")
    
    delete_connection(connection_id="connection_id")