Search code examples
pythonsqlpostgresqlsqlalchemyorm

SQLAlchemy 2.0 issue with foreign keys and relationships


I am using async postgres session and running into an issue with inserting and pulling data between 2 of my tables.

The customers table is a stand alone table that doesnt have a relationship with other tables. THis table gets updated independently by another insert.

For the quotes table, I need to be able to insert the quote with the corresponding customer ID (either the unique customer number or the id column in the customer table). When selecting data, I want to join to the customers table and pull the data back nested like the below.

class Customers(Base):
    __tablename__ = "customers"
    __table_args__ = (UniqueConstraint("unique_account_number"),)
    id = mapped_column(Integer, primary_key=True)
    customer_name: Mapped[str]
    unique_account_number: Mapped[str]
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)


class Quotes(Base):
    __tablename__ = "quotes"

    id = mapped_column(Integer, primary_key=True)
    origin: Mapped[str]
    destination: Mapped[str]
    customer = relationship(Customers, foreign_key[Customers.unique_account_number])
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)

Expected json insert into the quotes table

{
  "origin": "Ney York City",
  "destination": "Houston",
  "unique_account_number": "A9457HDA"
}

expected json from a select statement on the quotes table.

{
  "origin": "string",
  "destination": "string",
  "customer": { 
    "unique_account_number": "ABCD1234",
    "customer_name": "Customer LLC"

}

How can I achieve this through using the ORM?


Solution

  • import sys
    import asyncio
    import datetime
    import json
    
    from sqlalchemy import MetaData, select, UniqueConstraint, Integer, DateTime
    from sqlalchemy.sql import update, func
    
    from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship, joinedload
    from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine
    
    
    class Base(AsyncAttrs, DeclarativeBase):
        pass
    
    
    class Customers(Base):
        __tablename__ = "customers"
        __table_args__ = (UniqueConstraint("unique_account_number"),)
        id = mapped_column(Integer, primary_key=True)
        customer_name: Mapped[str]
        unique_account_number: Mapped[str]
        updated_datetime = mapped_column(
            DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
        )
        created_datetime = mapped_column(DateTime, default=datetime.datetime.now)
    
    
    class Quotes(Base):
        __tablename__ = "quotes"
    
        id = mapped_column(Integer, primary_key=True)
        origin: Mapped[str]
        destination: Mapped[str]
        unique_account_number: Mapped[str]
        customer = relationship(Customers,
                                primaryjoin="Quotes.unique_account_number==Customers.unique_account_number",
                                uselist=False,
                                foreign_keys=[Customers.unique_account_number])
        updated_datetime = mapped_column(
            DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
        )
        created_datetime = mapped_column(DateTime, default=datetime.datetime.now)
    
    
    
    async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
        async with async_session() as session:
            async with session.begin():
                session.add(Quotes(
                    origin="Ney York City",
                    destination="Houston",
                    unique_account_number="A9457HDA"))
                session.add(Customers(
                    customer_name="Customer LLC",
                    unique_account_number="A9457HDA"))
    
    async def select_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
        async with async_session() as session:
            async with session.begin():
                stmt = select(Quotes).options(joinedload(Quotes.customer))
                quotes = (await session.execute(stmt)).unique().scalars().all()
                return quotes
    
    
    def get_engine():
        username, password, db = sys.argv[1:4]
        return create_async_engine(
            f"postgresql+asyncpg://{username}:{password}@/{db}",
            echo=True,
        )
    
    
    def serialize_quote(quote):
        customer_dict = {attr: getattr(quote.customer, attr) for attr in ["unique_account_number", "customer_name"]}
        quote_dict = {attr: getattr(quote, attr) for attr in ["origin", "destination"]}
        quote_dict["customer"] = customer_dict
        return quote_dict
    
    async def async_main() -> None:
    
        engine = get_engine()
    
        async_session = async_sessionmaker(engine, expire_on_commit=False)
    
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    
        await insert_objects(async_session)
    
        quotes = (await select_objects(async_session))
        print ([json.dumps(serialize_quote(quote)) for quote in quotes])
    
        await engine.dispose()
    
    
    asyncio.run(async_main())
    
    ``