Search code examples
pythonamazon-web-servicesunit-testingboto3moto

Pytest with Moto, change the status of an athena query using the backend


I am using moto to test aws functionality in my codebase. One of the issues I have ran into is that when testing athena, the query status stayed in "QUEUED" indefinitely, causing the test to fail or time out.

Here is the method to be tested:

import time
import boto3

class Athena:

    CLIENT = boto3.client("athena")

    class QueryError(Exception):
        """A class for exceptions related to queries."""

    @classmethod
    def execute_query(cls, query, result_location, check_status=True, 
    time_limit=10):
        """
        Execute a query in Athena.
        """

        _result_configuration = {"OutputLocation": result_location}

        _kwargs = {"QueryString": query, "ResultConfiguration": 
        _result_configuration}

        response = cls.CLIENT.start_query_execution(**_kwargs)
        query_id = response["QueryExecutionId"]

        if check_status:
            old_time = time.time()
            while True:
                status = cls.CLIENT.get_query_execution(
                QueryExecutionId=query_id)
                status = status["QueryExecution"]["Status"]["State"]
                    if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
                        if status == "FAILED":
                            raise cls.QueryError("error")
                        break
                    time.sleep(0.2)  # 200ms

                    if time.time() - old_time > time_limit and status 
                    == "QUEUED":
                        raise cls.QueryError("time limit reached")

        return query_id

Here is the fixture passed into the test

from moto.s3 import mock_s3
import boto3

@pytest.fixture
def s3():
    with mock_s3():
        s3 = boto3.client("s3")
        yield s3

Here is the test (keep in mind you need to change from x to the module with the above method)

import uuid
import boto3
import pytest
from moto.athena import mock_athena
from moto.s3 import mock_s3


@mock_s3
@mock_athena
def test_execute_query_check(s3):
    from x import Athena

    """
    Test for 'execute_query' (with status check)
    """
    CLIENT = s3
    bucket_name = "pytest." + str(uuid.uuid4())

    # Bucket creation
    bucket_config = {"LocationConstraint": "us-east-2"}
    CLIENT.create_bucket(Bucket=bucket_name, 
    CreateBucketConfiguration=bucket_config)
    waiter = CLIENT.get_waiter("bucket_exists")
    waiter.wait(Bucket=bucket_name)

    s3_location = f"s3://{bucket_name}/"
    query = "SELECT current_date, current_time;"
    query_id = Athena.execute_query(query, s3_location, 
    check_status=True)
    assert query_id

This test fails because moto does not change the status of the query past "QUEUED" and the test is expecting a changed to state otherwise it triggers an exception.

I would like to be able to do something like:

from moto.athena import athena_backends
athena_backends['us-east-2'].job_flows[query_id].state = "SUCCEEDED"

as was suggested in this issue: https://github.com/spulec/moto/issues/380

However the "job flows" attribute does not seem to exist anymore on the boto3 mapreduce backend, and I cant find a method to explicitly change it. Ideally this would be able to happen somewhere in the test to manually change the state of the query to simulate how it would be with actual resources.


Solution

  • State can be accessed and changed as follows:

    athena_backends['us-east-2'].executions.get(query_id).status
    

    Sample code snippet

    from moto.athena import athena_backends
    query = "SELECT stuff"
    location = "s3://bucket-name/prefix/"
    database = "database"
    # Start Query
    exex_id = self.client.start_query_execution(
       QueryString=query,
       QueryExecutionContext={"Database": database},
       ResultConfiguration={"OutputLocation": location},
    )["QueryExecutionId"]
    athena_backends['us-west-2'].executions.get(exex_id).status = "CANCELLED"