I have an Angular 6 app which requests data from AWS Lambda. The data itself stored in the Glue database and queried with AWS Athena.
The AWS Glue database has the skip.header.line.count=1
option set and when I run Athena queries in console I get a response that does not have a header.
The problem occurs when I try to retrieve the data with boto3
. I have a function that runs a query and then paginates through the results:
def run_query_paged(self, query, page_token=None, page_size=10):
"""
Run query.
"""
request = self.athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
execution_id = request['QueryExecutionId']
if execution_id:
while True:
stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2) # 200ms
if status == 'SUCCEEDED':
paginator = self.athena_client.get_paginator('get_query_results')
pagination_config = {
'MaxItems': page_size,
'PageSize': page_size,
}
if page_token:
pagination_config['StartingToken'] = page_token
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig=pagination_config
)
for page in response_iterator:
next_token = page.get('NextToken', '')
results = page
break
return {
'rows': process_results(results),
'nextToken': next_token
}
if status == 'FAILED':
raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])
return None
The process_results
function converts response to a list taking into account column types:
def process_results(response):
"""
Processes the result of get_query_results function
"""
rows = response['ResultSet']['Rows']
meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
result = []
for row in rows:
parsed_row = {}
for idx, val in enumerate(row['Data']):
field = val
column_info = meta[idx]
if 'VarCharValue' in val:
value = val['VarCharValue']
else:
value = ''
parsed_row[column_info['Name']] = process_row_value(value, column_info)
result.append(parsed_row)
return result
The problem is that the first page of the paginated response has the header with column names like this:
{
"foo": "foo",
"bar": "bar"
},
{
"foo": 1,
"bar": 2
},
...
while all other pages does not have it. When I request the first page from my client side app I get a header plus 9 rows (page size is 10) and when I request next page using NextToken
then I get 10 rows back without a header. It is quite awkward to show 9 items in the first page and 10 items in all consequent.
How can I paginate through the results skipping the header?
I haven't found any option to skip the header, and hacked it by requesting page_size + 1
results in the first request, then page_size
for the rest.
def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
"""
Returns the query result for the provided page as well as a token to the next page if there are more
results to retrieve for the query.
"""
paginator = self.athena_client.get_paginator('get_query_results')
# The first page of response contains header. Increase the page size for a first page and then
# remove header so that all the pages would have the same size.
if starting_token:
skip_header = False
else:
page_size += 1
skip_header = True
max_items = page_size * 2
pagination_config = {
'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
}
if starting_token:
pagination_config['StartingToken'] = starting_token
response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)
iterator_index = 0
results = EMPTY_ATHENA_RESPONSE
next_token = None
# Retrieve only a single page and return the next token for the caller to iterate the response.
for page in response_iterator:
if iterator_index > 0:
if len(page['ResultSet']['Rows']) == 0:
next_token = None
break
next_token = page.get('NextToken')
results = page
iterator_index += 1
# ... process and return results