Search code examples
pythonpandassqlitesqlalchemy

Bulk Insert into tables with many-to-many relationship with SQLAlchemy


I have a table of employees. Each employee can discuss certain products of the company with customers, i.e. we have a many-to-many relationship between employees and products. The schema is the following:

metadata_obj = MetaData()

cv_calls_table = Table(
    "employee",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Column("key_id", String, nullable=True),
)

cv_products_table = Table(
    "products",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("product", String),
)

call_has_product_table = Table(
    "employee_has_product",
    metadata_obj,
    Column("employee_id", Integer, ForeignKey("employee.id")),
    Column("product_id", Integer, ForeignKey("products.id")),
)

Assuming that I have a csv file with, say, a million rows, of the following form:

+-----------+-----------+----------+
| Name      | key_id    | product  |
+-----------+-----------+----------+
| John Doe  | xyz       |pears     |
+-----------+-----------+----------+
| John Doe  | xyz       |apples    |
+-----------+-----------+----------+
| Ann Smith | abc       |oranges   |
+-----------+-----------+----------+
| Ann Smith | abc       |apples    |
+-----------+-----------+----------+

What would be the best way to perform a bulk insert into my Database with the use of Pandas (for reading the csv file) and SQLAlchemy? Inserting the employee and product data into the tables is no problem. I am struggling to find a neat way to insert the primary keys of employee and fruit into the junction table call_has_product_table .

I am using a SQLite Database.

I have tried using the SQLAlchemy ORM, but it seems that adding elements to a relationship can be only done individually for each employee, which is not feasible in terms of performance. I have therefore tried to set up the database with the Core. The code I used with ORM approach was

#keep only data of employees
employees = employee_data.drop_duplicates(subset=['Name','key_id']).reset_index(drop=True)

with Session(engine) as session:
    emply = []
    for i in employees.index:
        
        #Access each employee
        employee_row = employees.iloc[i,:]
        
        #Create the corresponding object
        employee_ = Employee(name = employee_row['Name'], key_id = employee_data['key_id'])

        #Get products associated to currently selected employee
        prods_ = employee_data.loc[(employee_data.Name == employee_row['Name']) & (employee_data.key_id == employee_row['key_id']), 'Product'].to_list()
        
        #Create corresponding Product objects
        products_ = [Product(product = prod_) for prod_ in prods_]
        
        #Add product objects to current employee
        employee_.products.extend(products_ )
        
        emply.append(employee_)
        
    session.add_all(emply)
    session.commit()
    session.close()

where employee_data is the Pandas dataframe of the form of the above table.

I could also solve the issue with Pandas by selecting the id's of employee and products but I think there should be a better way, only using the capabilities of SQLAlchemy (or perhaps pure SQL).


Solution

  • You can insert employee and products to create records then select all records to get the (auto-generated) primary keys. Merge them on the employee_data dataframe then populate your last table:

    from sqlalchemy.engine import create_engine, Engine
    from sqlalchemy.schema import MetaData, Table, Column, ForeignKey
    from sqlalchemy.types import String, Integer
    
    engine = create_engine('sqlite://')
    metadata_obj = MetaData()
    
    # Your table declaration here
    
    metadata_obj.create_all(engine)
    
    employee_data = pd.read_csv('data.csv').rename(columns={'Name': 'name'})
    
    with engine.connect() as con:
        # Employees
        employees = employee_data[['name','key_id']].drop_duplicates()
        con.execute(cv_calls_table.insert().values(employees.to_dict('records')))
        
        # Products
        products = employee_data[['product']].drop_duplicates()
        con.execute(cv_products_table.insert().values(products.to_dict('records')))
    
        con.commit()
        
        # Get Primary Keys
        tbl1 = pd.DataFrame(con.execute(cv_calls_table.select()).all()).rename(columns={'id': 'employee_id'})
        tbl2 = pd.DataFrame(con.execute(cv_products_table.select()).all()).rename(columns={'id': 'product_id'})
    
        employee_data = employee_data.merge(tbl1, on=['name', 'key_id'])
        employee_data = employee_data.merge(tbl2, on='product')
        
        # Employee / Product
        con.execute(call_has_product_table.insert().values(employee_data[['employee_id', 'product_id']].to_dict('records')))
        con.commit()
    

    Output:

    >>> employee_data
            name key_id  product  employee_id  product_id
    0   John Doe    xyz    pears            1           1
    1   John Doe    xyz   apples            1           2
    2  Ann Smith    abc   apples            2           2
    3  Ann Smith    abc  oranges            2           3
    

    Check with Pandas:

    con = engine.connect()
    
    >>> pd.read_sql_table('employee', con)
       id       name key_id
    0   1   John Doe    xyz
    1   2  Ann Smith    abc
    
    >>> pd.read_sql_table('products', con)
       id  product
    0   1    pears
    1   2   apples
    2   3  oranges
    
    >>> pd.read_sql_table('employee_has_product', con)
       employee_id  product_id
    0            1           1
    1            1           2
    2            2           2
    3            2           3