Search code examples
pythonmultithreadingpython-2.7multiprocessingdbf

Python - Mult-Threading Help - Reading Multiple Files - ETL Into SQL Server


I'm working on a program that reads DBF files from a local drive and loads the data into sql server tables. I'm pretty green with Python and I've found some details regarding multi-threading, most of which is confusing. The performance of the reading and inserting is slow and looking at my CPU usage, I have plenty of capacity. I'm also running SSD's.

This code will be expanded to read from about 20 DBF files amongst about 400 zips. So we're talking about 8000 DBF files in total.

I'm having a hard time doing this. Can you provide pointers?

Here's my code (it's a little messy but I'll clean it up later),

import os, pyodbc, datetime, shutil
from dbfread import DBF
from zipfile import ZipFile

# SQL Server Connection Test
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost\test;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test')
cursor = cnxn.cursor()

dr = 'e:\\Backups\\dbf\\'
work = 'e:\\Backups\\work\\'
archive = 'e:\\Backups\\archive\\'


for r in os.listdir(dr):

    curdate = datetime.datetime.now()
    filepath = dr + r
    process = work + r
    arc = archive + r

    pth = r.replace(".sss","")
    zipfolder = work + pth
    filedateunix = os.path.getctime(filepath)
    filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix)
                                                  ).strftime('%Y-%m-%d %H:%M:%S')
    shutil.move(filepath,process)
    with ZipFile(process) as zf:
        zf.extractall(zipfolder)


    cursor.execute(
        "insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)",
    pth, filedateconverted, curdate)
    cnxn.commit()

    for dirpath, subdirs, files in os.walk (zipfolder):

        for file in files:
            dateadded = datetime.datetime.now()

            if file.endswith(('.dbf','.DBF')):
                dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower()

                if dbflocation.__contains__("\\bk.dbf"):
                    table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
                    for record in table.records:
                        rec1 = str(record['code'])
                        rec2 = str(record['name'])
                        rec3 = str(record['addr1'])
                        rec4 = str(record['addr2'])
                        rec5 = str(record['city'])
                        rec6 = str(record['state'])
                        rec7 = str(record['zip'])
                        rec8 = str(record['tel'])
                        rec9 = str(record['fax'])
                        cursor.execute(
                       "insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)",
                        rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13)
                cnxn.commit()


                if dbflocation.__contains__("\\cr.dbf"):
                    table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
                    for record in table.records:
                        rec2 = str(record['cal_desc'])
                        rec3 = str(record['b_date'])
                        rec4 = str(record['b_time'])
                        rec5 = str(record['e_time'])
                        rec6 = str(record['with_desc'])
                        rec7 = str(record['recuruntil'])
                        rec8 = record['notes']
                        rec9 = dateadded
                        cursor.execute(
                        "insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)",
                        rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9)
                cnxn.commit() 

    shutil.move(process, archive)
    shutil.rmtree(zipfolder)

Solution

  • tl;dr: Measure first, fix later!


    Note that in the most common Python implementation (CPython) only one thread at a time can be executing Python bytecode. So threads are not a good way to increase CPU-bound performance. They can work well if the work is I/O-bound.

    But what you should do first and foremost is measure. This cannot be stressed enough. If you don't know what causes the lacking performance, you cannot fix it!

    Write single-threaded code that does the job, and run that under a profiler. Try the built-in cProfile first. If that doesn't give you enough information try e.g. a line profiler.

    The profiling should tell you which steps are consuming the most time. Once you know that, you can start improving.

    For instance, it doesn't make sense to use multiprocessing for reading DBF files if it is the action of stuffing the data into the SQL server takes the most time! That could even slow things down because several processes are then fighting for the attention of the SQL server.

    If the SQL server is not the bottleneck, and it can handle multiple connections, I would use multiprocessing, probably Pool.map() to read DBF's in parallel and stuff the data into the SQL server. In this case, you should Pool.map over a list of DBF file names, so that the files are opened in the worker processes.