Search code examples
pythonamazon-web-servicesboto3amazon-athenaaws-glue

How to skip header when paginating through AWS Athena query results


I have an Angular 6 app which requests data from AWS Lambda. The data itself stored in the Glue database and queried with AWS Athena. The AWS Glue database has the skip.header.line.count=1 option set and when I run Athena queries in console I get a response that does not have a header. The problem occurs when I try to retrieve the data with boto3. I have a function that runs a query and then paginates through the results:

def run_query_paged(self, query, page_token=None, page_size=10):
    """
    Run query.
    """
    request = self.athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': self.database
            },
        ResultConfiguration={
            'OutputLocation': self.s3_output,
            }
        )
    execution_id = request['QueryExecutionId']

    if execution_id:
        while True:
            stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
            status = stats['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(0.2)  # 200ms

        if status == 'SUCCEEDED':
            paginator = self.athena_client.get_paginator('get_query_results')
            pagination_config = {
                'MaxItems': page_size,
                'PageSize': page_size,
            }
            if page_token:
                pagination_config['StartingToken'] = page_token

            response_iterator = paginator.paginate(
                QueryExecutionId=execution_id,
                PaginationConfig=pagination_config
            )

            for page in response_iterator:
                next_token = page.get('NextToken', '')
                results = page
                break

            return {
                'rows': process_results(results),
                'nextToken': next_token
            }
        if status == 'FAILED':
            raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])

    return None

The process_results function converts response to a list taking into account column types:

def process_results(response):
    """
    Processes the result of get_query_results function
    """
    rows = response['ResultSet']['Rows']
    meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
    result = []
    for row in rows:
        parsed_row = {}
        for idx, val in enumerate(row['Data']):
            field = val
            column_info = meta[idx]
            if 'VarCharValue' in val:
                value = val['VarCharValue']
            else:
                value = ''
            parsed_row[column_info['Name']] = process_row_value(value, column_info)
        result.append(parsed_row)
    return result

The problem is that the first page of the paginated response has the header with column names like this:

{
    "foo": "foo",
    "bar": "bar"
},
{
    "foo": 1,
    "bar": 2
},
...

while all other pages does not have it. When I request the first page from my client side app I get a header plus 9 rows (page size is 10) and when I request next page using NextToken then I get 10 rows back without a header. It is quite awkward to show 9 items in the first page and 10 items in all consequent.

How can I paginate through the results skipping the header?


Solution

  • I haven't found any option to skip the header, and hacked it by requesting page_size + 1 results in the first request, then page_size for the rest.

    def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
        """
        Returns the query result for the provided page as well as a token to the next page if there are more
        results to retrieve for the query.
        """
        paginator = self.athena_client.get_paginator('get_query_results')
    
        # The first page of response contains header. Increase the page size for a first page and then
        # remove header so that all the pages would have the same size.
        if starting_token:
            skip_header = False
        else:
            page_size += 1
            skip_header = True
        max_items = page_size * 2
    
        pagination_config = {
            'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
            'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
        }
        if starting_token:
            pagination_config['StartingToken'] = starting_token
    
        response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)
    
    
        iterator_index = 0
        results = EMPTY_ATHENA_RESPONSE
        next_token = None
    
        # Retrieve only a single page and return the next token for the caller to iterate the response.
        for page in response_iterator:
            if iterator_index > 0:
                if len(page['ResultSet']['Rows']) == 0:
                    next_token = None
                break
            next_token = page.get('NextToken')
            results = page
            iterator_index += 1
    
        # ... process and return results