I have a table of employees. Each employee can discuss certain products of the company with customers, i.e. we have a many-to-many relationship between employees and products. The schema is the following:
metadata_obj = MetaData()
cv_calls_table = Table(
"employee",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("key_id", String, nullable=True),
)
cv_products_table = Table(
"products",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("product", String),
)
call_has_product_table = Table(
"employee_has_product",
metadata_obj,
Column("employee_id", Integer, ForeignKey("employee.id")),
Column("product_id", Integer, ForeignKey("products.id")),
)
Assuming that I have a csv file with, say, a million rows, of the following form:
+-----------+-----------+----------+
| Name | key_id | product |
+-----------+-----------+----------+
| John Doe | xyz |pears |
+-----------+-----------+----------+
| John Doe | xyz |apples |
+-----------+-----------+----------+
| Ann Smith | abc |oranges |
+-----------+-----------+----------+
| Ann Smith | abc |apples |
+-----------+-----------+----------+
What would be the best way to perform a bulk insert into my Database with the use of Pandas (for reading the csv file) and SQLAlchemy?
Inserting the employee and product data into the tables is no problem. I am struggling to find a neat way to insert the primary keys of employee and fruit into the junction table call_has_product_table
.
I am using a SQLite Database.
I have tried using the SQLAlchemy ORM, but it seems that adding elements to a relationship can be only done individually for each employee, which is not feasible in terms of performance. I have therefore tried to set up the database with the Core. The code I used with ORM approach was
#keep only data of employees
employees = employee_data.drop_duplicates(subset=['Name','key_id']).reset_index(drop=True)
with Session(engine) as session:
emply = []
for i in employees.index:
#Access each employee
employee_row = employees.iloc[i,:]
#Create the corresponding object
employee_ = Employee(name = employee_row['Name'], key_id = employee_data['key_id'])
#Get products associated to currently selected employee
prods_ = employee_data.loc[(employee_data.Name == employee_row['Name']) & (employee_data.key_id == employee_row['key_id']), 'Product'].to_list()
#Create corresponding Product objects
products_ = [Product(product = prod_) for prod_ in prods_]
#Add product objects to current employee
employee_.products.extend(products_ )
emply.append(employee_)
session.add_all(emply)
session.commit()
session.close()
where employee_data
is the Pandas dataframe of the form of the above table.
I could also solve the issue with Pandas by selecting the id
's of employee
and products
but I think there should be a better way, only using the capabilities of SQLAlchemy (or perhaps pure SQL).
You can insert employee
and products
to create records then select all records to get the (auto-generated) primary keys. Merge them on the employee_data
dataframe then populate your last table:
from sqlalchemy.engine import create_engine, Engine
from sqlalchemy.schema import MetaData, Table, Column, ForeignKey
from sqlalchemy.types import String, Integer
engine = create_engine('sqlite://')
metadata_obj = MetaData()
# Your table declaration here
metadata_obj.create_all(engine)
employee_data = pd.read_csv('data.csv').rename(columns={'Name': 'name'})
with engine.connect() as con:
# Employees
employees = employee_data[['name','key_id']].drop_duplicates()
con.execute(cv_calls_table.insert().values(employees.to_dict('records')))
# Products
products = employee_data[['product']].drop_duplicates()
con.execute(cv_products_table.insert().values(products.to_dict('records')))
con.commit()
# Get Primary Keys
tbl1 = pd.DataFrame(con.execute(cv_calls_table.select()).all()).rename(columns={'id': 'employee_id'})
tbl2 = pd.DataFrame(con.execute(cv_products_table.select()).all()).rename(columns={'id': 'product_id'})
employee_data = employee_data.merge(tbl1, on=['name', 'key_id'])
employee_data = employee_data.merge(tbl2, on='product')
# Employee / Product
con.execute(call_has_product_table.insert().values(employee_data[['employee_id', 'product_id']].to_dict('records')))
con.commit()
Output:
>>> employee_data
name key_id product employee_id product_id
0 John Doe xyz pears 1 1
1 John Doe xyz apples 1 2
2 Ann Smith abc apples 2 2
3 Ann Smith abc oranges 2 3
Check with Pandas:
con = engine.connect()
>>> pd.read_sql_table('employee', con)
id name key_id
0 1 John Doe xyz
1 2 Ann Smith abc
>>> pd.read_sql_table('products', con)
id product
0 1 pears
1 2 apples
2 3 oranges
>>> pd.read_sql_table('employee_has_product', con)
employee_id product_id
0 1 1
1 1 2
2 2 2
3 2 3