Search code examples
pythonsql-serverapache-kafkapyodbc

Fast load data from Kafka to MSSQL


I have read data from Kafka and now I will need to load data to the MSSQL server.

I have tried a few methods.

This method load only 500 rows by 1 minutes:

while max_offset < end_offset-1:
    msg = consumer.poll() 
    max_offset += 1
if msg is not None:
    if msg.error():
        print(f'Error while receiving message: {msg.error()}')
    else:
        value = msg.value()
        offset = msg.offset()
        try: 
            cursor = conn_str.cursor()
            df = pd.json_normalize(value)
            for index, row in df.iterrows():
                cursor.execute("INSERT INTO TABLE_NAME (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) values(?,?,?,?,?,?)", row.column_1, row.column_2, row.column_3, row.column_4, row.column_5, row.column_6)
                conn_str.commit()
            
                print(f'Received message:\n{df}')
        except Exception as e:
            print(f'Error message: {e}')
else:
    print('No messages received')

The second method which I tried, was :

messages_to_insert = []
batch_size = 10000
counter = 0

while max_offset < end_offset-1:
    msg = consumer.poll()
    max_offset += 1
    if msg is not None:
        if msg.error():
           
        else:
         
            row = msg.value()
            offset = msg.offset()
         
            messages_to_insert.append((row["field1"], row["field2"], row["field3"], row["field4"], row["field5"], row["field6"]))
            counter += 1
           
            if counter >= batch_size:
                cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
                conn_str.commit()
                messages_to_insert = []
                counter = 0

if messages_to_insert:
    cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
    conn_str.commit()

It method a little bit faster.

And the last one was loaded to a .csv file. To .csv file loads very fast, but a load to mssql is too slow

messages_to_insert = []
batch_size = 10000
counter = 0

with open('data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['COLUMN_1', 'COLUMN_2', 'COLUMN_3', 'COLUMN_4', 'COLUMN_5', 'COLUMN_6'])

    while max_offset < end_offset-1:
        msg = consumer.poll()
        max_offset += 1
        if msg is not None:
            if msg.error():
                logger.error(f'Error while receiving message: {msg.error()}')
            else:
                row = msg.value()
                offset = msg.offset()

                # Write row to file
                writer.writerow([row['field1'], row['field2'], row['field3'], row['field4'], row['field5'], field6])

file_path = 'data.csv'


with open(file_path, 'r') as f:
    lines = f.readlines()
    lines.pop(0)
data = [tuple(line.strip().split(',')) for line in lines]
placeholders = ','.join('?' * len(data[0]))


    query = f"INSERT INTO <table_name> (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) VALUES ({placeholders})"


    cursor = conn_str.cursor()
    cursor.executemany(query, data)
    cursor.commit()

conn.close()               

And tried load to a .csv file after this using SSIS, but the load speed was the same.

enter image description here


Solution

  • The code below works fine for me:

    values_list.append(decoded_msg)
    
    if len(values_list) == 10000:
        df = pd.DataFrame(values_list)
        column_order = ['col1', 'col2', 'col3']
        df = df.reindex(columns=column_order)
       
        with engine.connect() as connection:
            
            print(df)
            try:
                df.to_sql('TABLE_NAME', con=engine, if_exists='append', index=False, dtype={'COL1': Integer(), 'COL2': String(), 'COL3': String()}, schema='SCHEMA')
            except Exception as e:
                print("An error occurred while inserting data: ", e)
            connection.commit()
    
        values_list = []