The existing posts didn't provide a useful answer to me.
I'm trying to run asynchronous database tests using Pytest (db is Postgres with asyncpg), and I'd like to initialize my database using my Alembic migrations so that I can verify that they work properly in the meantime.
My first attempt was this:
@pytest.fixture(scope="session")
async def tables():
"""Initialize a database before the tests, and then tear it down again"""
alembic_config: config.Config = config.Config('alembic.ini')
command.upgrade(alembic_config, "head")
yield
command.downgrade(alembic_config, "base")
which didn't actually do anything at all (migrations were never applied to the database, tables not created).
Both Alembic's documentation & Pytest-Alembic's documentation say that async migrations should be run by configuring your env
like this:
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
asyncio.run(run_migrations_online())
but this doesn't resolve the issue (however it does work for production migrations outside of pytest).
I stumpled upon a library called pytest-alembic
that provides some built-in tests for this.
When running pytest --test-alembic
, I get the following exception:
got Future attached to a different loop
A few comments on pytest-asyncio
's GitHub repository suggest that the following fixture might fix it:
@pytest.fixture(scope="session")
def event_loop() -> Generator:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
but it doesn't (same exception remains).
Next I tried to run the upgrade
test manually, using:
async def test_migrations(alembic_runner):
alembic_runner.migrate_up_to("revision_tag_here")
which gives me
alembic_runner.migrate_up_to("revision_tag_here")
venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264: in run_connection_task return asyncio.run(run(engine))
RuntimeError: asyncio.run() cannot be called from a running event loop
However this is an internal call by pytest-alembic
, I'm not calling asyncio.run()
myself, so I can't apply any of the online fixes for this (try-catch
ing to check if there is an existing event loop to use, etc.). I'm sure this isn't related to my own asyncio.run()
defined in the alembic env
, because if I add a breakpoint - or just raise an exception above it - the line is actually never executed.
Lastly, I've also tried nest-asyncio.apply()
, which just hangs forever.
A few more blog posts suggest to use this fixture to initialize database tables for tests:
async with engine.begin() as connection:
await connection.run_sync(Base.metadata.create_all)
which works for the purpose of creating a database to run tests against, but this doesn't run through the migrations so that doesn't help my case.
I feel like I've tried everything there is & visited every docs page, but I've got no luck so far. Running an async migration test surely can't be this difficult?
If any extra info is required I'm happy to provide it.
I got this up and running pretty easily with the following
env.py
- the main idea here is that the migration can be run synchronously
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = mymodel.Base.metadata
def run_migrations_online():
connectable = context.config.attributes.get("connection", None)
if connectable is None:
connectable = AsyncEngine(
engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True
)
)
if isinstance(connectable, AsyncEngine):
asyncio.run(run_async_migrations(connectable))
else:
do_run_migrations(connectable)
async def run_async_migrations(connectable):
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()
then I added a simple db init script
init_db.py
from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine
__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"
cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)
async def migrate_db(conn_url: str):
async_engine = create_async_engine(conn_url, echo=True)
async with async_engine.begin() as conn:
await conn.run_sync(__execute_upgrade)
def __execute_upgrade(connection):
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")
then your pytest fixture can look like this
conftest.py
...
@pytest_asyncio.fixture(autouse=True)
async def migrate():
await migrate_db(conn_url)
yield
...
Note: I don't scope my migrate fixture to the test session, I tend to drop and migrate after each test.