I have read data from Kafka and now I will need to load data to the MSSQL server.
I have tried a few methods.
This method load only 500 rows by 1 minutes:
while max_offset < end_offset-1:
msg = consumer.poll()
max_offset += 1
if msg is not None:
if msg.error():
print(f'Error while receiving message: {msg.error()}')
else:
value = msg.value()
offset = msg.offset()
try:
cursor = conn_str.cursor()
df = pd.json_normalize(value)
for index, row in df.iterrows():
cursor.execute("INSERT INTO TABLE_NAME (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) values(?,?,?,?,?,?)", row.column_1, row.column_2, row.column_3, row.column_4, row.column_5, row.column_6)
conn_str.commit()
print(f'Received message:\n{df}')
except Exception as e:
print(f'Error message: {e}')
else:
print('No messages received')
The second method which I tried, was :
messages_to_insert = []
batch_size = 10000
counter = 0
while max_offset < end_offset-1:
msg = consumer.poll()
max_offset += 1
if msg is not None:
if msg.error():
else:
row = msg.value()
offset = msg.offset()
messages_to_insert.append((row["field1"], row["field2"], row["field3"], row["field4"], row["field5"], row["field6"]))
counter += 1
if counter >= batch_size:
cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
conn_str.commit()
messages_to_insert = []
counter = 0
if messages_to_insert:
cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
conn_str.commit()
It method a little bit faster.
And the last one was loaded to a .csv file. To .csv file loads very fast, but a load to mssql is too slow
messages_to_insert = []
batch_size = 10000
counter = 0
with open('data.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['COLUMN_1', 'COLUMN_2', 'COLUMN_3', 'COLUMN_4', 'COLUMN_5', 'COLUMN_6'])
while max_offset < end_offset-1:
msg = consumer.poll()
max_offset += 1
if msg is not None:
if msg.error():
logger.error(f'Error while receiving message: {msg.error()}')
else:
row = msg.value()
offset = msg.offset()
# Write row to file
writer.writerow([row['field1'], row['field2'], row['field3'], row['field4'], row['field5'], field6])
file_path = 'data.csv'
with open(file_path, 'r') as f:
lines = f.readlines()
lines.pop(0)
data = [tuple(line.strip().split(',')) for line in lines]
placeholders = ','.join('?' * len(data[0]))
query = f"INSERT INTO <table_name> (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) VALUES ({placeholders})"
cursor = conn_str.cursor()
cursor.executemany(query, data)
cursor.commit()
conn.close()
And tried load to a .csv file after this using SSIS, but the load speed was the same.
The code below works fine for me:
values_list.append(decoded_msg)
if len(values_list) == 10000:
df = pd.DataFrame(values_list)
column_order = ['col1', 'col2', 'col3']
df = df.reindex(columns=column_order)
with engine.connect() as connection:
print(df)
try:
df.to_sql('TABLE_NAME', con=engine, if_exists='append', index=False, dtype={'COL1': Integer(), 'COL2': String(), 'COL3': String()}, schema='SCHEMA')
except Exception as e:
print("An error occurred while inserting data: ", e)
connection.commit()
values_list = []