1

I'm working on a program that reads DBF files from a local drive and loads the data into sql server tables. I'm pretty green with Python and I've found some details regarding multi-threading, most of which is confusing. The performance of the reading and inserting is slow and looking at my CPU usage, I have plenty of capacity. I'm also running SSD's.

This code will be expanded to read from about 20 DBF files amongst about 400 zips. So we're talking about 8000 DBF files in total.

I'm having a hard time doing this. Can you provide pointers?

Here's my code (it's a little messy but I'll clean it up later),

import os, pyodbc, datetime, shutil
from dbfread import DBF
from zipfile import ZipFile

# SQL Server Connection Test
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost\test;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test')
cursor = cnxn.cursor()

dr = 'e:\\Backups\\dbf\\'
work = 'e:\\Backups\\work\\'
archive = 'e:\\Backups\\archive\\'


for r in os.listdir(dr):

    curdate = datetime.datetime.now()
    filepath = dr + r
    process = work + r
    arc = archive + r

    pth = r.replace(".sss","")
    zipfolder = work + pth
    filedateunix = os.path.getctime(filepath)
    filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix)
                                                  ).strftime('%Y-%m-%d %H:%M:%S')
    shutil.move(filepath,process)
    with ZipFile(process) as zf:
        zf.extractall(zipfolder)


    cursor.execute(
        "insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)",
    pth, filedateconverted, curdate)
    cnxn.commit()

    for dirpath, subdirs, files in os.walk (zipfolder):

        for file in files:
            dateadded = datetime.datetime.now()

            if file.endswith(('.dbf','.DBF')):
                dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower()

                if dbflocation.__contains__("\\bk.dbf"):
                    table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
                    for record in table.records:
                        rec1 = str(record['code'])
                        rec2 = str(record['name'])
                        rec3 = str(record['addr1'])
                        rec4 = str(record['addr2'])
                        rec5 = str(record['city'])
                        rec6 = str(record['state'])
                        rec7 = str(record['zip'])
                        rec8 = str(record['tel'])
                        rec9 = str(record['fax'])
                        cursor.execute(
                       "insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)",
                        rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13)
                cnxn.commit()


                if dbflocation.__contains__("\\cr.dbf"):
                    table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
                    for record in table.records:
                        rec2 = str(record['cal_desc'])
                        rec3 = str(record['b_date'])
                        rec4 = str(record['b_time'])
                        rec5 = str(record['e_time'])
                        rec6 = str(record['with_desc'])
                        rec7 = str(record['recuruntil'])
                        rec8 = record['notes']
                        rec9 = dateadded
                        cursor.execute(
                        "insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)",
                        rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9)
                cnxn.commit() 

    shutil.move(process, archive)
    shutil.rmtree(zipfolder)
1
  • Another option I'm up for is multiprocessing which may be more simple. Commented Feb 6, 2017 at 18:28

2 Answers 2

1

tl;dr: Measure first, fix later!


Note that in the most common Python implementation (CPython) only one thread at a time can be executing Python bytecode. So threads are not a good way to increase CPU-bound performance. They can work well if the work is I/O-bound.

But what you should do first and foremost is measure. This cannot be stressed enough. If you don't know what causes the lacking performance, you cannot fix it!

Write single-threaded code that does the job, and run that under a profiler. Try the built-in cProfile first. If that doesn't give you enough information try e.g. a line profiler.

The profiling should tell you which steps are consuming the most time. Once you know that, you can start improving.

For instance, it doesn't make sense to use multiprocessing for reading DBF files if it is the action of stuffing the data into the SQL server takes the most time! That could even slow things down because several processes are then fighting for the attention of the SQL server.

If the SQL server is not the bottleneck, and it can handle multiple connections, I would use multiprocessing, probably Pool.map() to read DBF's in parallel and stuff the data into the SQL server. In this case, you should Pool.map over a list of DBF file names, so that the files are opened in the worker processes.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks Roland, you're absolutely right on this one. There was no bottleneck other than my code. I was able to use the multiprocessing module and loaded 15 processes at a time with a 1 second delay. I did this because I noticed that sometimes the processes attempted to pick up the same file. Now I'm getting a SQL Server CPU bottleneck which is exactly what I wanted to see. SQL Server CPU usage went from about 8% to 50% and the time to process 8k files went from 10+ hours to 45 minutes!
@HMan06 More than 10x faster? Nice!
0

You can try executemany() method instead of single insert in a loop. Here is an example of insert function in some ETL script:

def sql_insert(table_name, fields, rows, truncate_table = True):
    if len(rows) == 0:
        return

    cursor = mdwh_connection.cursor()
    cursor.fast_executemany = True
    values_sql = ('?, ' * (fields.count(',') + 1))[:-2]

    if truncate_table:
        sql_truncate(table_name, cursor)
    
    insert_sql = 'insert {0} ({1}) values ({2});'.format(table_name, fields, values_sql)
    current_row = 0
    batch_size = 50000

    while current_row < len(rows):
        cursor.executemany(insert_sql, rows[current_row:current_row + batch_size])
        mdwh_connection.commit()
        current_row += batch_size
        logging.info(
            '{} more records inserted. Total: {}'.format(
                min(batch_size,len(rows)-current_row+batch_size),
                min(current_row, len(rows))
            )
        )    

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.