Search code examples
pythonpytestasyncpg

cannot perform operation: another operation is in progress in pytest


I want to test some function, that work with asyncpg. If I run one test at a time, it works fine. But if I run several tests at a time, all tests except the first one crash with the error asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress.

Tests:

@pytest.mark.asyncio
async def test_project_connection(superuser_id, project_id):
    data = element_data_random(project_id)

    element_id = (await resolve_element_create(data=data, user_id=superuser_id))["id"]
    project_elements = (await db_projects_element_ids_get([project_id]))[project_id]

    assert element_id in project_elements


@pytest.mark.asyncio
async def test_project_does_not_exist(superuser_id):
    data = element_data_random(str(uuid.uuid4()))

    with pytest.raises(ObjectWithIdDoesNotExistError):
        await resolve_element_create(data=data, user_id=superuser_id)

All functions for work with db use pool look like:

async def <some_db_func>(*args):
    pool = await get_pool()

    await pool.execute(...) # or fetch/fetchrow/fetchval

How I get the pool:

db_pool = None


async def get_pool():
    global db_pool

    async def init(con):
        await con.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
        await con.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')

    if not db_pool:
        dockerfiles_dir = os.path.join(src_dir, 'dockerfiles')
        env_path = os.path.join(dockerfiles_dir, 'dev.env')

        try:
            # When code and DB inside docker containers
            host = 'postgres-docker'
            socket.gethostbyname(host)
        except socket.error:
            # When code on localhost, but DB inside docker container
            host = 'localhost'

        load_dotenv(dotenv_path=env_path)

        db_pool = await asyncpg.create_pool(
            database=os.getenv("POSTGRES_DBNAME"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD"),
            host=host,
            init=init
        )  

    return db_pool

As far as I understand under the hood, asynсpg creates a new connection and runs the request inside that connection if you run the request through pool. Which makes it clear that each request should have its own connection. However, this error occurs, which is caused when one connection tries to handle two requests at the same time


Solution

  • Okay, thanks to @Adelin I realized that I need to run each asynchronous test synchronously. I I'm new to asyncio so I didn't understand it right away and found a solution.

    It was:

    @pytest.mark.asyncio
    async def test_...(*args):
        result = await <some_async_func>
    
        assert result == excepted_result
    

    It become:

    def test_...(*args):
        async def inner()
            result = await <some_async_func>
    
            assert result == excepted_result
    
        asyncio.get_event_loop().run_until_complete(inner())