AWS provides us an SDK way to interact with WebSocket APIs, post_to_connection, delete_connection, get_connection
But how can we invoke the APIs from HTTP, which means a kind of python requests
thing?
What I tried?
Here I'm struggling to find an example for how to sign a request for an API Gateway service.
Here is the code to invoke web socket APIs using python-requests:
In the below code, replace aws_region, and api_id with your data.
You can use get_connection, post_connection, delete_connection to get connection details, post msg to connection, and delete connection.
#!/usr/bin/python
import os
from time import sleep
import requests
import json
from urllib.parse import quote, urlparse
import hmac
import hashlib
from datetime import datetime
def get_canonical_path(url):
"""
Create canonical URI--the part of the URI from domain to query
string (use '/' if no path)
"""
parsed_url = urlparse(url)
# safe chars adapted from boto's use of urllib.parse.quote
# https://github.com/boto/boto/blob/d9e5cfe900e1a58717e393c76a6e3580305f217a/boto/auth.py#L393
return quote(parsed_url.path if parsed_url.path else '/', safe='/-_.~')
def get_canonical_querystring(url):
"""
Create the canonical query string. According to AWS, by the
end of this function our query string values must
be URL-encoded (space=%20) and the parameters must be sorted
by name.
This method assumes that the query params in `r` are *already*
url encoded. If they are not url encoded by the time they make
it to this function, AWS may complain that the signature for your
request is incorrect.
It appears elasticsearc-py url encodes query paramaters on its own:
https://github.com/elastic/elasticsearch-py/blob/5dfd6985e5d32ea353d2b37d01c2521b2089ac2b/elasticsearch/connection/http_requests.py#L64
If you are using a different client than elasticsearch-py, it
will be your responsibility to urleconde your query params before
this method is called.
"""
canonical_querystring = ''
parsedurl = urlparse(url)
querystring_sorted = '&'.join(sorted(parsedurl.query.split('&')))
for query_param in querystring_sorted.split('&'):
key_val_split = query_param.split('=', 1)
key = key_val_split[0]
if len(key_val_split) > 1:
val = key_val_split[1]
else:
val = ''
if key:
if canonical_querystring:
canonical_querystring += "&"
canonical_querystring += u'='.join([key, val])
return canonical_querystring
def sign(key, msg):
"""
Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
"""
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def get_signature_key(secret_key, date_stamp, region_name, service_name):
"""
Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
"""
kDate = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, region_name)
kService = sign(kRegion, service_name)
kSigning = sign(kService, 'aws4_request')
return kSigning
def get_headers(aws_host: str, url: str, request_body: str, aws_region: str, service: str, aws_access_key: str,
aws_secret_access_key: str, method: str):
method = method.upper()
# Create a date for headers and the credential string
t = datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d') # Date w/o time for credential_scope
canonical_uri = get_canonical_path(url)
canonical_querystring = get_canonical_querystring(url)
# Create the canonical headers and signed headers. Header names
# and value must be trimmed and lowercase, and sorted in ASCII order.
# Note that there is a trailing \n.
canonical_headers = ('host:' + aws_host + '\n' +
'x-amz-date:' + amzdate + '\n')
# Create the list of signed headers. This lists the headers
# in the canonical_headers list, delimited with ";" and in alpha order.
# Note: The request can include any headers; canonical_headers and
# signed_headers lists those that you want to be included in the
# hash of the request. "Host" and "x-amz-date" are always required.
signed_headers = 'host;x-amz-date'
payload_hash = hashlib.sha256(request_body).hexdigest()
# Combine elements to create create canonical request
canonical_request = (method + '\n' + canonical_uri + '\n' +
canonical_querystring + '\n' + canonical_headers +
'\n' + signed_headers + '\n' + payload_hash)
# Match the algorithm to the hashing algorithm you use, either SHA-1 or
# SHA-256 (recommended)
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = (date_stamp + '/' + aws_region + '/' +
service + '/' + 'aws4_request')
string_to_sign = (algorithm + '\n' + amzdate + '\n' + credential_scope +
'\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())
# Create the signing key using the function defined above.
signing_key = get_signature_key(secret_key=aws_secret_access_key, date_stamp=date_stamp, region_name=aws_region,
service_name=service)
# Sign the string_to_sign using the signing_key
string_to_sign_utf8 = string_to_sign.encode('utf-8')
signature = hmac.new(signing_key,
string_to_sign_utf8,
hashlib.sha256).hexdigest()
# The signing information can be either in a query string value or in
# a header named Authorization. This code shows how to use a header.
# Create authorization header and add to request headers
authorization_header = (algorithm + ' ' + 'Credential=' + aws_access_key +
'/' + credential_scope + ', ' + 'SignedHeaders=' +
signed_headers + ', ' + 'Signature=' + signature)
headers = {
'Authorization': authorization_header,
'x-amz-date': amzdate,
'x-amz-content-sha256': payload_hash,
'Content-Type': 'application/json'
}
return headers
aws_host = '<api_id>.execute-api.<aws_region>.amazonaws.com'
connections_base_url = 'https://<api_id>.execute-api.<aws_region>.amazonaws.com/<api_stage>/@connections'
body = "Hey everyone From HTTP"
body = body.encode('utf-8')
aws_region = '<aws_region>'
service = 'execute-api'
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY_ID")
def delete_connection(connection_id):
url = f'{connections_base_url}/{connection_id}'
headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
method="delete")
# print(headers)
response = requests.delete(url=url, headers=headers, data=body)
print(f"Deleted {connection_id} connection: {response.text}", response.status_code)
def post_message(connection_id):
url = f'{connections_base_url}/{connection_id}'
headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
method="post")
# print(headers)
response = requests.post(url=url, headers=headers, data=body)
print(f"Reponse for send msg to {connection_id}: {response.text}", response.status_code)
def get_connection(connection_id):
url = f'{connections_base_url}/{connection_id}'
headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
method="get")
# print(headers)
response = requests.get(url=url, headers=headers, data=body)
print(f"Status: {response.text}", response.status_code)
# Sample
connection_id = "connection_id"
get_connection(connection_id)
post_message(connection_id="connection_id")
delete_connection(connection_id="connection_id")