Search code examples
python-3.xasynchronoustornadopytestaiopg

Failure on unit tests with pytest, tornado and aiopg, any query fail


I've a REST API running on Python 3.7 + Tornado 5, with postgresql as database, using aiopg with SQLAlchemy core (via the aiopg.sa binding). For the unit tests, I use py.test with pytest-tornado.

All the tests go ok as soon as no query to the database is involved, where I'd get this:

Runtime error: Task cb=[IOLoop.add_future..() at venv/lib/python3.7/site-packages/tornado/ioloop.py:719]> got Future attached to a different loop

The same code works fine out of the tests, I'm capable of handling 100s of requests so far.

This is part of an @auth decorator which will check the Authorization header for a JWT token, decode it and get the user's data and attach it to the request; this is the part for the query:

                partner_id = payload['partner_id']
                provided_scopes = payload.get("scope", [])
                for scope in scopes:
                    if scope not in provided_scopes:
                        logger.error(
                            'Authentication failed, scopes are not compliant - '
                            'required: {} - '
                            'provided: {}'.format(scopes, provided_scopes)
                        )
                        raise ForbiddenException(
                            "insufficient permissions or wrong user."
                        )
                db = self.settings['db']
                partner = await Partner.get(db, username=partner_id)
                # The user is authenticated at this stage, let's add
                # the user info to the request so it can be used
                if not partner:
                    raise UnauthorizedException('Unknown user from token')
                p = Partner(**partner)
                setattr(self.request, "partner_id", p.uuid)
                setattr(self.request, "partner", p)

The .get() async method from Partner comes from the Base class for all models in the app. This is the .get method implementation:

@classmethod
async def get(cls, db, order=None, limit=None, offset=None, **kwargs):
    """
    Get one instance that will match the criteria
    :param db:
    :param order:
    :param limit:
    :param offset:
    :param kwargs:
    :return:
    """
    if len(kwargs) == 0:
        return None
    if not hasattr(cls, '__tablename__'):
        raise InvalidModelException()
    tbl = cls.__table__
    instance = None
    clause = cls.get_clause(**kwargs)
    query = (tbl.select().where(text(clause)))
    if order:
        query = query.order_by(text(order))
    if limit:
        query = query.limit(limit)
    if offset:
        query = query.offset(offset)
    logger.info(f'GET query executing:\n{query}')
    try:
        async with db.acquire() as conn:
            async with conn.execute(query) as rows:
                instance = await rows.first()
    except DataError as de:
        [...]
    return instance

The .get() method above will either return a model instance (row representation) or None.

It uses the db.acquire() context manager, as described in aiopg's doc here: https://aiopg.readthedocs.io/en/stable/sa.html.

As described in this same doc, the sa.create_engine() method returns a connection pool, so the db.acquire() just uses one connection from the pool. I'm sharing this pool to every request in Tornado, they use it to perform the queries when they need it.

So this is the fixture I've set up in my conftest.py:

@pytest.fixture
async def db():
    dbe = await setup_db()
    return dbe


@pytest.fixture
def app(db, event_loop):
    """
    Returns a valid testing Tornado Application instance.
    :return:
    """
    app = make_app(db)
    settings.JWT_SECRET = 'its_secret_one'
    return app

I can't find an explanation of why this is happening; Tornado's doc and source makes it clear that asyncIO event loop is used as default, and by debugging it I can see the event loop is indeed the same one, but for some reason it seems to get closed or stopped abruptly.

This is one test that fails:

@pytest.mark.gen_test(timeout=2)
def test_score_returns_204_empty(app, http_server, http_client, base_url):
    score_url = '/'.join([base_url, URL_PREFIX, 'score'])
    token = create_token('test', scopes=['score:get'])
    headers = {
        'Authorization': f'Bearer {token}',
        'Accept': 'application/json',
    }
    response = yield http_client.fetch(score_url, headers=headers, raise_error=False)
    assert response.code == 204

This test fails as it returns 401 instead of 204, given the query on the auth decorator fails due to the RuntimeError, which returns then an Unauthorized response.

Any idea from the async experts here will be very appreciated, I'm quite lost on this!!!


Solution

  • Well, after a lot of digging, testing and, of course, learning quite a lot about asyncio, I made it work myself. Thanks for the suggestions so far.

    The issue was that the event_loop from asyncio was not running; as @hoefling mentioned, pytest itself does not support coroutines, but pytest-asyncio brings such a useful feature to your tests. This is very well explained here: https://medium.com/ideas-at-igenius/testing-asyncio-python-code-with-pytest-a2f3628f82bc

    So, without pytest-asyncio, your async code that needs to be tested will look like this:

    def test_this_is_an_async_test():
       loop = asyncio.get_event_loop()
       result = loop.run_until_complete(my_async_function(param1, param2, param3)
       assert result == 'expected'
    

    We use loop.run_until_complete() as, otherwise, the loop will never be running, as this is the way asyncio works by default (and pytest makes nothing to make it work differently).

    With pytest-asyncio, your test works with the well-known async / await parts:

    async def test_this_is_an_async_test(event_loop):
       result = await my_async_function(param1, param2, param3)
       assert result == 'expected'
    

    pytest-asyncio in this case wraps the run_until_complete() call above, summarizing it heavily, so the event loop will run and be available for your async code to use it.

    Please note: the event_loop parameter in the second case is not even necessary here, pytest-asyncio gives one available for your test.

    On the other hand, when you are testing your Tornado app, you usually need to get a http server up and running during your tests, listening in a well-known port, etc., so the usual way goes by writing fixtures to get a http server, base_url (usually http://localhost:, with an unused port, etc etc).

    pytest-tornado comes up as a very useful one, as it offers several of these fixtures for you: http_server, http_client, unused_port, base_url, etc.

    Also to mention, it gets a pytest mark's gen_test() feature, which converts any standard test to use coroutines via yield, and even to assert it will run with a given timeout, like this:

        @pytest.mark.gen_test(timeout=3)
        def test_fetch_my_data(http_client, base_url):
           result = yield http_client.fetch('/'.join([base_url, 'result']))
           assert len(result) == 1000
    

    But, this way it does not support async / await, and actually only Tornado's ioloop will be available via the io_loop fixture (although Tornado's ioloop uses by default asyncio underneath from Tornado 5.0), so you'd need to combine both pytest.mark.gen_test and pytest.mark.asyncio, but in the right order! (which I did fail).

    Once I understood better what could be the problem, this was the next approach:

        @pytest.mark.gen_test(timeout=2)
        @pytest.mark.asyncio
        async def test_score_returns_204_empty(http_client, base_url):
            score_url = '/'.join([base_url, URL_PREFIX, 'score'])
            token = create_token('test', scopes=['score:get'])
            headers = {
                'Authorization': f'Bearer {token}',
                'Accept': 'application/json',
            }
            response = await http_client.fetch(score_url, headers=headers, raise_error=False)
            assert response.code == 204
    

    But this is utterly wrong, if you understand how Python's decorator wrappers work. With the code above, pytest-asyncio's coroutine is then wrapped in a pytest-tornado yield gen.coroutine, which won't get the event-loop running... so my tests were still failing with the same problem. Any query to the database were returning a Future waiting for an event loop to be running.

    My updated code once I made myself up of the silly mistake:

        @pytest.mark.asyncio
        @pytest.mark.gen_test(timeout=2)
        async def test_score_returns_204_empty(http_client, base_url):
            score_url = '/'.join([base_url, URL_PREFIX, 'score'])
            token = create_token('test', scopes=['score:get'])
            headers = {
                'Authorization': f'Bearer {token}',
                'Accept': 'application/json',
            }
            response = await http_client.fetch(score_url, headers=headers, raise_error=False)
            assert response.code == 204
    

    In this case, the gen.coroutine is wrapped inside the pytest-asyncio coroutine, and the event_loop runs the coroutines as expected!

    But there were still a minor issue that took me a little while to realize, too; pytest-asyncio's event_loop fixture creates for every test a new event loop, while pytest-tornado creates too a new IOloop. And the tests were still failing, but this time with a different error.

    The conftest.py file now looks like this; please note I've re-declared the event_loop fixture to use the event_loop from pytest-tornado io_loop fixture itself (please recall pytest-tornado creates a new io_loop on each test function):

    @pytest.fixture(scope='function')
    def event_loop(io_loop):
        loop = io_loop.current().asyncio_loop
        yield loop
        loop.stop()
    
    
    @pytest.fixture(scope='function')
    async def db():
        dbe = await setup_db()
        yield dbe
    
    
    @pytest.fixture
    def app(db):
        """
        Returns a valid testing Tornado Application instance.
        :return:
        """
        app = make_app(db)
        settings.JWT_SECRET = 'its_secret_one'
        yield app
    

    Now all my tests work, I'm back a happy man and very proud of my now better understanding of the asyncio way of life. Cool!