Search code examples
pythonpostgresqlsqlalchemymigrationalembic

alembic not generating tables in schema


I have a docker setup consisting of three containers. A) uvicorn/fastAPI app B) mongoDB C) Postgres DB

All three working. This is a problem with alembic, hosted in container A with the rest of my app, making changes to postgres in container C.

I have a DatabaseSession class that connects to the DB. I can connect just fine and run commands. It does seem to be responding. Here is the connect method within that class:

def _connect_db(self, db_schema_override: str = None):
        schema = self.schema if not db_schema_override else db_schema_override
        try:
            engine = create_engine(
                self.db_url, connect_args={"options": "-csearch_path={}".format(schema)}
            )
            SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
            session = SessionLocal()
            metadata = MetaData()
            metadata.reflect(bind=engine)
            metadata.create_all(engine)
            return engine, session
        except (AttributeError, ValueError):
            raise

here is my alembic migrations file

def upgrade_db(db_schema: str, revision: str = "head") -> None:
    db = DatabaseSession(db_schema)
    db.db_data.maintenance_mode = True
    db.db_data.save()
    _config = config.Config("path/to/file/alembic.ini")
    _config.set_main_option("sqlalchemy.url", db.db_url)
    with db.engine.begin() as cnxn:
        _config.attributes["connection"] = cnxn
        _config.attributes["schema"] = db_schema.lower()
        command.upgrade(_config, revision)
        db.db_data.maintenance_mode = False
        db.db_data.save()

here is my alembic env.py file run_migrations_online function:

def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    db_schema = config.attributes.get("schema", "public")

    with connectable.connect() as connection:
        connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{db_schema}"'))
        connection.execute(text(f"SET search_path TO '{db_schema}'"))
        connection.dialect.default_schema_name = db_schema

        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            include_schemas=True,
        )

        with context.begin_transaction():
            context.run_migrations()

as you can see, Im utilizing schemas within each database to further seperate concerns. Im basically using mongodb to store DB records including database name and schema. Ive checked permissions on postgres, i can log into postgres and run commands as the same user and create tables.

I can access postgres from python using my DatabaseSession class and run commands as that same user. But I cannot seem to create anything.

When i run a 'create table' command using my DatabaseSession class from python interpreter, it registers the command was sent. But if i run \dt or anything in the psql CLI in another window, no changes show up. If i try to rerun the 'create table' command using the DatabaseSession class again, itll this time say this table already exists.

Its like its not committing the command. If i kill the session, or do a rollback, I can rerun the command just fine once. Am i supposed to be commiting this?

ive also tried just running alembic upgrade head. Same issue. It says it ran all the migrations just fine. no errors. but no changes persist in the DB.

additionally, Alembic does not appear to be creating the versions table where it houses the migration ID.

ive done this EXACT same setup in the past (earlier version of sqlalchemy). Im not exactly sure whats so different now that i cant seem to get this work.

Summary

  • using database/schema setup
  • can connect and run commands as user within schema via CLI
  • can connect and run commands from python. but nothing persists
  • no alembic version table exists within my database/schema

any help is greatly appreciated. thank you


Solution

  • My 'event' example below appears to be working but you're right. Your example just seems to rollback when you set echo=True on the engine.

    Your example doesn't seem to work in SqlAlchemy>=2 only in SqlAlchemy<2.

    After thinking about this it seems if you add connection.commit() after you set the dialect.default_schema_name = db_schema then your code does seem to work in sqlalchemy 2.

    I created a new alembic project, set the sqlalchemy url to a test postgresql db.

    Then I:

    1. rm any versions
    2. drop the db
    3. create the db
    4. run alembic revision --autogenerate
    5. run alembic upgrade head

    Your version seems to rollback.

    I lifted the @event from setting-alternate-search-paths-on-connect.

    from logging.config import fileConfig
    
    from sqlalchemy import engine_from_config
    from sqlalchemy import pool
    from sqlalchemy.sql import text
    
    from alembic import context
    
    # this is the Alembic Config object, which provides
    # access to the values within the .ini file in use.
    config = context.config
    config.attributes['schema'] = 'myschema'
    
    # Interpret the config file for Python logging.
    # This line sets up loggers basically.
    if config.config_file_name is not None:
        fileConfig(config.config_file_name)
    
    # add your model's MetaData object here
    # for 'autogenerate' support
    # from myapp import mymodel
    # target_metadata = mymodel.Base.metadata
    from sqlalchemy import MetaData
    from sqlalchemy import Table, Column, Integer
    
    metadata = MetaData()
    users_t = Table(
        "users",
        metadata,
        Column("user_id", Integer, primary_key=True),
    )
    
    target_metadata = metadata
    
    # other values from the config, defined by the needs of env.py,
    # can be acquired:
    # my_important_option = config.get_main_option("my_important_option")
    # ... etc.
    
    
    def run_migrations_offline() -> None:
        """Run migrations in 'offline' mode.
    
        This configures the context with just a URL
        and not an Engine, though an Engine is acceptable
        here as well.  By skipping the Engine creation
        we don't even need a DBAPI to be available.
    
        Calls to context.execute() here emit the given string to the
        script output.
    
        """
        url = config.get_main_option("sqlalchemy.url")
        context.configure(
            url=url,
            target_metadata=target_metadata,
            literal_binds=True,
            dialect_opts={"paramstyle": "named"},
        )
    
        with context.begin_transaction():
            context.run_migrations()
    
    
    def run_migrations_online() -> None:
        """Run migrations in 'online' mode.
    
        In this scenario we need to create an Engine
        and associate a connection with the context.
    
        """
        connectable = engine_from_config(
            config.get_section(config.config_ini_section, {}),
            prefix="sqlalchemy.",
            poolclass=pool.NullPool,
            echo=True,
        )
        db_schema = config.attributes.get("schema", "public")
    
        # Toggle this between 'event' and 'inline'
        schema_strategy = 'inline'
    
        if schema_strategy == 'event':
            from sqlalchemy import event
            @event.listens_for(connectable, "connect", insert=True)
            def set_search_path(dbapi_connection, connection_record):
                existing_autocommit = dbapi_connection.autocommit
                dbapi_connection.autocommit = True
                cursor = dbapi_connection.cursor()
                cursor.execute(f'CREATE SCHEMA IF NOT EXISTS {db_schema}')
                cursor.execute(f"SET SESSION search_path='{db_schema}'")
                cursor.close()
                dbapi_connection.autocommit = existing_autocommit
    
        with connectable.connect() as connection:
    
            if schema_strategy == 'inline':
                connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{db_schema}"'))
                connection.execute(text(f"SET search_path TO '{db_schema}'"))
                connection.dialect.default_schema_name = db_schema
    
            context.configure(
                connection=connection, target_metadata=target_metadata,
                include_schemas=True,
            )
            with context.begin_transaction() as trans:
                context.run_migrations()
    
    
    if context.is_offline_mode():
        run_migrations_offline()
    else:
        run_migrations_online()