6

I have a SQLite3 DB. I need to parse 10000 files. I read some data from each file, and then query the DB with this data to get a result. My code works fine in a single process environment. But I get an error when trying to use the mulitprocessing Pool.

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files: 
     foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4) 
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB 

I get the following error: sqlite3.ProgrammingError: Base Cursor.__init__ not called.

My DB class is implemented as follows:

def open_db(sqlite_file):
    """Open SQLite database connection.

    Args:
    sqlite_file -- File path

    Return:
    Connection
    """

    log.info('Open SQLite database %s', sqlite_file)
    try:
        conn = sqlite3.connect(sqlite_file)
    except sqlite3.Error, e:
        log.error('Unable to open SQLite database %s', e.args[0])
        sys.exit(1)

    return conn

def close_db(conn, sqlite_file):
    """Close SQLite database connection.

    Args:
    conn -- Connection
    """

    if conn:
        log.info('Close SQLite database %s', sqlite_file)
        conn.close()

class MapDB:

    def __init__(self, sqlite_file):
        """Initialize.

        Args:
        sqlite_file -- File path
        """

        # 1. Open database.
        # 2. Setup to receive data as dict().
        # 3. Get cursor to execute queries.
        self._sqlite_file      = sqlite_file
        self._conn             = open_db(sqlite_file)
        self._conn.row_factory = sqlite3.Row
        self._cursor           = self._conn.cursor()

    def close(self):
        """Close DB connection."""

        if self._cursor:
            self._cursor.close()
        close_db(self._conn, self._sqlite_file)

    def check(self):
        ...

    def get_driver_net(self, net):
        ...

    def get_cell_id(self, net):
       ...

Function foo() looks like this:

def foo(f, x1, x2, db):

  extract some data from file f
  r1 = db.get_driver_net(...)
  r2 = db.get_cell_id(...)

The overall not working implementation is as follows:

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]                 
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)    
pool.close()
mapdb.close()

To fix this, I think I need to create the MapDB() object inside each pool worker (so have 4 parallel/independent connections). But I'm not sure how to do this. Can someone show me an example of how to accomplish this with Pool?

1 Answer 1

5

What about defining foo like this:

def foo(f, x1, x2, db_path):
    mapdb = MapDB(db_path)
    ... open mapdb
    ... process data ...
    ... close mapdb

and then change your pool.map call to:

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)    

Update

Another option is to handle the worker threads yourself and distribute work via a Queue.

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
  mapdb = ...open the sqlite database
  while True:
    item = q.get()
    if item[0] == "file":
      file = item[1]
      ... process file ...
      q.task_done()
    else:
      q.task_done()
      break
  ...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
  worker = Thread(target=worker)
  worker.daemon = True
  worker.start()

# Place work on the Queue

for x in ...list of files...:
  q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
  q.put(("end",))

# Wait for all work to be done.

q.join()

The termination tokens are used to ensure that the sqlite connections are closed - in case that matters.

Sign up to request clarification or add additional context in comments.

6 Comments

But if I'm processing 10000 files, then wouldn't I end up opening/closing the DB handle 10000 times? I'm not sure if will be a significant performance hit, but I'll profile it.
Here is the data: Serial run: ~18 minutes to process everything, ~56000 DB queries. 4 pool run with your suggestion: ~3 min,28 seconds. This should be very good for my needs! Thanks.
I just updated the answer with another approach. :-)
Also - how about accepting and up voting the answer?
Made a small edit - added a q.task_done() call in the worker routine. See this blog post for more details: troyfawkes.com/learn-python-multithreading-queues-basics
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.