Search code examples
pythonmysqlsqlalchemypyramid

With sql alchemy query is there a way to iterate over all columns in a table


I want to create a search function on my web page that is linked to database of 5 tables.

The idea is that the user can select the table to search and enter a search term and it will return a list of results.

Obviously I could do this by using 5 if-statements but seems like there should be an easier way.

This code is what I have tried but for some reason when I put item and table variables in to the query instead of the actual column names it doesn't work. It only works if i type the specific terms in table and table.item.

   if 'form.submitted' in request.params:
        #search term
        search = request.params['body']
        #table to be searched
        table = request.params['table']
        list = eval(table).__table__.columns._data.keys()

        results = {}

        for item in list:
          try:

            searchdb = request.dbsession.query(table).filter(table.item==search).all()                                     

            results[item]=searchdb

          except:

              continue

        return (results)

I want to know if this is possible or I should just give up and write 5 if statements


Solution

  • Please don't use eval(), it is mostly evil. You cannot use it safely on user input, as shown in "Eval really is dangerous". Imagine an adversary who is calling your API and instead of using your form or such just sends you something along the lines of

    body=byebye&table=__import__("os").system("rm -rf /")
    

    Depending on your setup you might lose your entire system, or the container, or what have you. And that is not the only thing they could do. They have the whole of Python expressions to play with.

    In your case a proper way to handle user selected tables (models) is to have a lookup:

    the_5_tables = {
        "table_1": Table1,
        "table_2": Table2,
        # etc.
    }
    

    Then all you need to do is

    #table to be searched
    model = the_5_tables[request.params['table']]
    

    This has the added benefit of whitelisting what tables the user is able to use, even if the current scope has access to others.

    Producing the filters is easier done using the actual Column objects instead of their keys:

    results = {}
    
    for col in model.__table__.columns:
        try:
            searchdb = request.dbsession.query(model).filter(col == search).all()                                     
            results[col.key] = searchdb
    
        except Exception as e:
            print(f"Unhandled error: {e}")
            continue
    
    return results
    

    This will unfortunately make as many roundtrips to the database as you have columns in your table. Now, I'm guessing that the bare except: was there in order to mask errors arising from type mismatches, for example when you're trying to search against a numeric column. You could inspect the columns further to avoid that:

    from sqlalchemy.types import String
    
    # ...
    
    results = {}
    
    for col in model.__table__.columns:
        if isinstance(col.type, String):
            searchdb = request.dbsession.query(model).filter(col == search).all()                                     
            results[col.key] = searchdb
    
    return results
    

    Taking it even further, if you're not actually that interested in which columns matched, you could just form a single query such as:

    from sqlalchemy import literal
    from sqlalchemy.types import String
    
    # ...
    
    str_cols = [c for c in model.__table__.c if isinstance(c.type, String)]
    results = request.dbsession.query(model).filter(literal(search).in_(str_cols)).all()                                     
    return results
    

    Though a bit hacky, it's still possible to also fetch which column(s) matched in a single query (this is not that useful; it's trivial to do the same in Python after the query):

    from sqlalchemy import func
    
    # ...
    
    results = request.dbsession.query(
            model,
            func.concat_ws(",", *[
                func.if_(c == search, c.key, None)
                for c in str_cols
            ]).label("in_columns")).\
        filter(literal(search).in_(str_cols)).all()                                     
    return results
    

    The in_columns will be a comma separated string of column names that matched.