I am trying to read data from MySQL database and load it in bulks to Azure Synapse database using pyodbc
library and OBDC 18 driver, I've managed to do so but the loading into the Synapse DB step takes a lot of time and I suspect that it is very inefficient, here is my attempt:
import mysql.connector
import pyodbc
def _connect_mysql(host_p, port_p, username_p, password_p):
try:
db = mysql.connector.connect(host=host_p, port=port_p, user=username_p, passwd=password_p, connection_timeout=1,
auth_plugin='mysql_native_password')
return db
except Exception as e:
print("Couldn't connect to mysql db. Error: %s" % (str(e)))
exit()
bulk_size = 5000
mysql_host = "127.0.0.1"
mysql_port = 3306
mysql_username = 'root'
mysql_password = 'password'
con_mysql_db = _connect_mysql(mysql_host, mysql_port, mysql_username, mysql_password)
crsr_mysql_db = con_mysql_db.cursor()
crsr_mysql_db.execute('SELECT ID, VALUE, TIME FROM table.data;')
bulk_count = 0
with pyodbc.connect(
'DRIVER={ODBC Driver 18 for SQL Server};SERVER=tcp:' + server + ';PORT=' + str(
port) + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password) as crsr_dwh_db:
crsr_dwh_db.autocommit = True
with crsr_dwh_db.cursor() as crsr_dwh_db_crsr:
crsr_dwh_db_crsr.fast_executemany = True
while True:
bulk_rows = crsr_internal_db.fetchmany(int(bulk_size))
if not bulk_rows:
break
bulk_count += 1
print(f"Bulk number: {bulk_count}")
crsr_dwh_db_crsr.executemany("insert into synapse_table.data(ID, VALUE, TIME) values (?, ?, ?)", bulk_rows)
As mentioned above, this code does work but it takes approximately 40 seconds just to load 1000 rows, what am I doing wrong? what is the more efficient way to do this?
The easiest way is to use the Copy Data Tool in Azure Data Factory/Synapse.
If you want to load using Python, run in a Synapse notebook and use Spark and the Azure Synapse Dedicated SQL Pool Spark Connector instead of Pyodbc.
Sticking with Pyodbc, you can vastly improve your loading speed by sending the data in one JSON doc as a string and parsing and loading on the server. See eg: Update SQL Server database using stored procedure with table as paramater using PYODBC in Python