I want to test some function, that work with asyncpg
. If I run one test at a time, it works fine. But if I run several tests at a time, all tests except the first one crash with the error asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
.
Tests:
@pytest.mark.asyncio
async def test_project_connection(superuser_id, project_id):
data = element_data_random(project_id)
element_id = (await resolve_element_create(data=data, user_id=superuser_id))["id"]
project_elements = (await db_projects_element_ids_get([project_id]))[project_id]
assert element_id in project_elements
@pytest.mark.asyncio
async def test_project_does_not_exist(superuser_id):
data = element_data_random(str(uuid.uuid4()))
with pytest.raises(ObjectWithIdDoesNotExistError):
await resolve_element_create(data=data, user_id=superuser_id)
All functions for work with db use pool look like:
async def <some_db_func>(*args):
pool = await get_pool()
await pool.execute(...) # or fetch/fetchrow/fetchval
How I get the pool:
db_pool = None
async def get_pool():
global db_pool
async def init(con):
await con.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
await con.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
if not db_pool:
dockerfiles_dir = os.path.join(src_dir, 'dockerfiles')
env_path = os.path.join(dockerfiles_dir, 'dev.env')
try:
# When code and DB inside docker containers
host = 'postgres-docker'
socket.gethostbyname(host)
except socket.error:
# When code on localhost, but DB inside docker container
host = 'localhost'
load_dotenv(dotenv_path=env_path)
db_pool = await asyncpg.create_pool(
database=os.getenv("POSTGRES_DBNAME"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host=host,
init=init
)
return db_pool
As far as I understand under the hood, asynсpg
creates a new connection and runs the request inside that connection if you run the request through pool. Which makes it clear that each request should have its own connection. However, this error occurs, which is caused when one connection tries to handle two requests at the same time
Okay, thanks to @Adelin I realized that I need to run each asynchronous test synchronously. I I'm new to asyncio
so I didn't understand it right away and found a solution.
It was:
@pytest.mark.asyncio
async def test_...(*args):
result = await <some_async_func>
assert result == excepted_result
It become:
def test_...(*args):
async def inner()
result = await <some_async_func>
assert result == excepted_result
asyncio.get_event_loop().run_until_complete(inner())