Search code examples
pythonmysqlsqlalchemyorm

How to get a maximum value of a column for several ranges of values of another column, in a single query, with SQLAlchemy ORM?


I have a MySQL database table with a date-time column and a associated data column. I need maximum values of the data column for multiple pre-defined date-time ranges. Let's say I have a row for every 5 minutes and I need the maximum value of the data column for every consecutive 12-hour ranges. I need to do it in one query and I need it done with SQLAlchemy's ORM functions.

I've found this answer which does what I need. However, I struggled to implement it using SQLAlchemy's ORM approach. Specifically, the parentheses with the subquery using union_all is where my understanding of SQLAlchemy is lacking.


Solution

  • I think maybe you need to apply subquery() but maybe include the code you have tried. Here is a version that seems to work but I don't know a lot about datetime handling in mysql and datetime handling can get really complicated so better research that separately. This is using "core" and the "ORM" layers together in SQLAlchemy 2+ by utilizing select().

    I try to rollback the transaction in this example so I can run the example over and over again.

    from datetime import datetime
    from sqlalchemy import (
        Integer,
        String,
        DateTime,
        ForeignKey,
        UniqueConstraint,
    )
    from sqlalchemy.schema import (
        Column,
    )
    from sqlalchemy.orm import backref, relationship, declarative_base, Session
    from sqlalchemy import create_engine, MetaData, Column, ForeignKey, Integer, String
    from sqlalchemy.sql import select, union_all, literal_column, func, and_, bindparam
    
    engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_IP}:{DB_PORT}', echo=True)
    
    Base = declarative_base(metadata=MetaData(schema="testdb"))
    
    from sqlalchemy import Column, String, SMALLINT, column
    from sqlalchemy.dialects.mysql import SMALLINT, TINYINT, BIGINT, CHAR
    
    class Sample(Base):
        __tablename__ = "samples"
        id = Column(BIGINT(20), primary_key=True, autoincrement=True, nullable=False)
        recorded_on = Column(DateTime, nullable=False)
        recorded_value = Column(BIGINT(20), nullable=False)
    
    
    Base.metadata.create_all(engine)
    
    
    # Ranges are from the bottom of the hour ie. minute=0, second=0 to the bottom
    # of the next hour so we can do >= range1 and < range2.
    # We have to do the final hour with range2 as the next day.
    RECORDED_ON_RANGES = [
        (datetime(year=2023, month=10, day=11, hour=hour, minute=0, second=0), datetime(year=2023, month=10, day=11, hour=hour+1, minute=0, second=0)) for hour in range(23)
    ] + [
        (datetime(year=2023, month=10, day=11, hour=23, minute=0, second=0), datetime(year=2023, month=10, day=12, hour=0, minute=0, second=0))
    ]
    
    
    def get_test_recorded_value(hour, minute):
        # Try to include a few high and low in specific hours and then a fallback.
        if hour == 12 and minute % 2 == 0:
            return 100
        elif hour in (6, 12):
            return 3
        elif hour == 13 and minute % 3 == 0:
            return 4
        else:
            return 1
    
    def get_test_samples():
        # Make samples every minute for hours 0 to 19 leave hours 20 to 23 empty to test LEFT OUTER JOIN.
        return (Sample(
                    recorded_on=datetime(year=2023, month=10, day=11, hour=hour, minute=minute, second=0),
                    recorded_value=get_test_recorded_value(hour, minute)) for minute in range(60) for hour in range(20))
    
    
    with Session(engine) as session, session.begin():
    
        session.add_all(get_test_samples())
    
        session.flush()
    
        # Apply subquery() to allow subsequent reference to columns in union.
        # Only label the first select columns like the example.
        range_union = select(
            bindparam(None, value=RECORDED_ON_RANGES[0][0], type_=DateTime).label("range1"),
            bindparam(None, value=RECORDED_ON_RANGES[0][1], type_=DateTime).label("range2")
        ).union_all(*[
            select(
                bindparam(None, value=range1, type_=DateTime),
                bindparam(None, value=range2, type_=DateTime)
            ) for range1, range2 in RECORDED_ON_RANGES[1:]
        ]).subquery()
    
        q = select(
            range_union.c.range1,
            range_union.c.range2,
            func.max(Sample.recorded_value),
        ).outerjoin(
            Sample,
            and_(Sample.recorded_on >= range_union.c.range1, Sample.recorded_on < range_union.c.range2)
        ).group_by(range_union.c.range1, range_union.c.range2)
    
        for (range1, range2, max_value) in session.execute(q):
            print (range1, range2, max_value)
    
        session.rollback()