0

I'm downloading a data from an API and storing it in SQLite db. I want to implement the process using "multithreading". Can someone please help me with how to implement it.

I found a library but getting an error. below is the code.

import sqlite3
import os

import pandas as pd
from sodapy import Socrata

import concurrent.futures

dbPath = 'folder where db exists'
dbName = 'db file name'

## Setup connection & cursor with the DB
dbConn = sqlite3.connect(os.path.join(dbPath, dbName), check_same_thread=False)

## Setup the API and bring in the data
client = Socrata("health.data.ny.gov", None)

## Define all the countys to be used in threading
countys = [all 62 countys in New York]

varDict = dict.fromkeys(countys, {})
strDataList = ['test_date', 'LoadDate']
intDataList = ['new_positives', 'cumulative_number_of_positives', 'total_number_of_tests', 'cumulative_number_of_tests']


def getData(county):
    
    ## Check if table exists
    print("Processing ", county)
    varDict[county]['dbCurs'] = dbConn.cursor()
    varDict[county]['select'] = varDict[county]['dbCurs'].execute('SELECT name FROM sqlite_master WHERE type="table" AND name=?', (county,) )
    if not len(varDict[county]['select'].fetchall()):
        createTable(county)
    
    whereClause = 'county="'+county+'"'
    varDict[county]['results'] = client.get("xdss-u53e", where=whereClause)
    varDict[county]['data'] = pd.DataFrame.from_records(varDict[county]['results'])
    varDict[county]['data'].drop(['county'], axis=1, inplace=True)
    varDict[county]['data']['LoadDate'] = pd.to_datetime('now')
    varDict[county]['data'][strDataList] = varDict[county]['data'][strDataList].astype(str)
    varDict[county]['data']['test_date'] = varDict[county]['data']['test_date'].apply(lambda x: x[:10])
    varDict[county]['data'][intDataList] = varDict[county]['data'][intDataList].astype(int)
    varDict[county]['data'] = varDict[county]['data'].values.tolist()

    ## Insert values into SQLite
    varDict[county]['sqlQuery'] = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)'
    varDict[county]['dbCurs'].executemany(varDict[county]['sqlQuery'], varDict[county]['data'])
    dbConn.commit()
    
# for i in dbCurs.execute('SELECT * FROM albany'):
#     print(i)

def createTable(county):
    
    sqlQuery = 'CREATE TABLE ['+county+'] ( [Test Date] TEXT, [New Positives] INTEGER NOT NULL, [Cumulative Number of Positives] INTEGER NOT NULL, [Total Number of Tests Performed] INTEGER NOT NULL, [Cumulative Number of Tests Performed] INTEGER NOT NULL, [Load date] TEXT NOT NULL, PRIMARY KEY([Test Date]))'
    varDict[county]['dbCurs'].execute(sqlQuery)
    

# for _ in countys:
#     getData(_)
    
# x = countys[:5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    # results = [executor.submit(getData, y) for y in x]
    executor.map(getData, countys)

getData is the function which brings in the data county wise and loads into the db. Countys is a list of all the countys. I am able to do it synchronously but would like to implement multithreading. The for loop to do it synchronously (which works) is

for _ in countys:
    getData(_)

The error message is

ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8016 and this is thread id 19844.
3
  • You can fetch data with multiple threads, but only one thread can write to an SQLite Database at a time. Commented Jan 31, 2021 at 17:21
  • Any idea how I can change the code so that new thread doesnt interfere if old one is working. Commented Jan 31, 2021 at 17:24
  • sqlQuery = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)' dbCurs.executemany(sqlQuery, data) dbConn.commit() this is the code to do insert Commented Jan 31, 2021 at 17:24

1 Answer 1

1

You might find this useful

sqlite.connect(":memory:", check_same_thread=False)
Sign up to request clarification or add additional context in comments.

5 Comments

I'm not getting the error anymore but my code hangs
Maybe you have an unrelated bug. Could you gist the relevant lines of code?
I'm not able to figure out which line is actually causing the issue. I have attached the full code for your reference.
What does this exactly do?
I have added comments and an overview. Can you please tell me what part is confusing you.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.