0

I have a system where I am trying to achieve a single point of action for writing to a database. For this I am using Queues, I fetch a record from a queue and add to a list. Once I have a required number of records in the list, I push the item onto the writer_queue, which is consumed by a different thread to write the records to database (SQLite in this case).

However, when I am adding the list to the writer_queue, it seems like not all records within the list are added, which is leading to unwanted gaps in the final table. I am not quite sure what is the cause of this.

Below is the code which I am dealing with:

import os
import time
import sqlite3

from queue import Queue
from pydantic import BaseModel
from typing import List, Dict, Optional
from threading import Thread
from dataclasses import dataclass

class Tables(BaseModel):
    max_buffer_length: int = 1000
    rt_table_name: str = ''
    query_schemas: Dict = {
        'RawTable': ['Time', 'Position', 'RPM', 'Flow', 
                              'Density', 'Pressure', 'Tension', 'Torque', 'Weight',],
    }
    table_schemas: Dict = {
        'RawTable': ['time', 'position', 'rpm', 'flow', 
                              'density', 'pressure', 'tension', 'torque', 'weight',],
    }

    def insert_data(self,
                    buffer: Optional[Queue] = None,
                    db_path: Optional[str] = None,
                    writer_queue: Optional[Queue] = None):

        conn = sqlite3.connect(db_path)
        
        table_name = self.rt_table_name

        cursor = conn.cursor()

        try:                    
            if buffer:
                self.insert_batch_data(conn, cursor, buffer, table_name, writer_queue)

        except sqlite3.Error as e:
            print(f"An error occurred: {e}")
            conn.rollback()
        finally:
            conn.commit()
            cursor.close()
            conn.close()

    def insert_batch_data(self, 
                          buffer: Queue,
                          table_name: str, 
                          writer_queue: Queue):
        # Insert data
        query = self.get_query(table_name)
        batch = []
        while True:
            if buffer.empty():
                time.sleep(5)
                continue

            item = buffer.get()
            if item is None:
                print("Reached Sentinal Value... Exiting thread...")
                break
            
            batch.append(item)
            if len(batch) == self.max_buffer_length:
                self.add_to_writer_queue(query, table_name, batch, writer_queue)
                print(f"Number of items added to writer_queue: {len(batch)}")
                batch.clear()

        # Insert any remaining records in the batch
        if batch:
            self.add_to_writer_queue(query, table_name, batch, writer_queue)

    def get_query(self, table_name: str) -> str:
        if not table_name:
            raise ValueError("Table name must not be empty")
        columns = self.query_schemas[table_name]
        placeholders = ', '.join(['?' for _ in columns])
        query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
        return query

    def insert_records(self, cursor: sqlite3.Cursor, conn: sqlite3.Connection, query: str, batch: List, table_name: str):
        try:
            columns = self.table_schemas[table_name]
            data_tuples = [
                tuple(getattr(row, col) for col in columns)
                for row in batch
            ]
            cursor.executemany(query, data_tuples)
            conn.commit()
            print(f"Inserted {len(batch)} records into {table_name}")
        except sqlite3.Error as e:
            print(f"SQLite error occurred while inserting records into {table_name}: {e}")
            conn.rollback()
        except Exception as e:
            print(f"Unexpected error occurred while inserting records into {table_name}: {e}")
            conn.rollback()

    def process_db_writes(self,
                          writer_queue: Queue, 
                          db_path: str):
        try:
            conn = sqlite3.connect(db_path)
            cursor = conn.cursor()
                
            while True:
                while writer_queue.empty():
                    time.sleep(2)
                    # Sleep for 30 seconds
                    
                query, table_name, data = writer_queue.get()
                assert len(data) == self.max_buffer_length, f"Expected {self.max_buffer_length} items, received: {len(data)}\n"
                self.insert_records(cursor, conn, query, data, table_name)
        except Exception as e:
            print(f"Error encountered in process_db_writes: {str(e)}")
        finally:
            cursor.close()
            conn.close()

    def add_to_writer_queue(self, 
                            query: str, 
                            table_name: str, 
                            batch: List, 
                            writer_queue: Queue):
        while writer_queue.full():
            time.sleep(1)
        assert len(batch) == self.max_buffer_length, f"Expected {self.max_buffer_length} items, received: {len(batch)}\n"
        writer_queue.put((query, table_name, batch))

@dataclass
class RawData:
    time: float 
    position: float = 0.0
    rpm: float = 0.0
    flow: float = 0.0
    density: float = 0.0
    pressure: float = 0.0
    tension: float = 0.0
    torque: float = 0.0
    weight: float = 0.0

class Raw(Tables):
    def __init__(self, **data):
        super().__init__(**data)
        self.rt_table_name = 'RawTable'

    def populate_queue(buffer: Queue):
        for i in range(1_000_000):
            while buffer.full():
                time.sleep(1)
            buffer.put(RawData(time=i))
    
def fetch_raw_data(db_path: str, rt_db_path: str, db_writer: Queue, max_buffer_length: int, ):
    try:
        conn = sqlite3.connect(db_path)

        buffer = Queue(maxsize=max_buffer_length)

        raw = Raw(max_buffer_length=max_buffer_length)
        # Starting other threads
        fetcher = Thread(target=raw.populate_queue, args=(buffer))
        writer = Thread(target=raw.insert_data, args=(buffer, rt_db_path, db_writer))

        # Start the Threads
        fetcher.start()
        writer.start()

        # Join the threads
        fetcher.join()
        writer.join()
    except KeyboardInterrupt:
        print("Finishing threads due to keyboard interruption.")
        fetcher.join()
        writer.join()
    except Exception as e:
        print("Error encountered: ", e)
    finally:
        if conn:
            conn.close()

def get_rt_db_path(db_path: str, db_extension: str = '.RT'):
        db_dir, db_file = os.path.split(db_path)
        db_name, _ = os.path.splitext(db_file)

        if db_name.endswith('_Raw'):
            db_name = db_name[:-4]
        rt_db_name = db_name + '_' + db_extension
        return os.path.join(db_dir, rt_db_name)

def main():
    db_path = input("Enter absolute path of raw.db file: ")

    try:
        maxsize=1000
        db_writer = Queue(maxsize=maxsize)
        tables = Tables(max_buffer_length=maxsize)

        rt_db_path = get_rt_db_path(db_path)

        db_writer_thread = Thread(target=tables.process_db_writes, args=(db_writer, rt_db_path))
        # Start the db_writer_thread
        db_writer_thread.start()
        fetch_raw_data(db_path, rt_db_path, maxsize, db_writer)

        db_writer_thread.join()
    except KeyboardInterrupt:
        db_writer_thread.join()
    except Exception:
        db_writer_thread.join()

The problem seems to occur between Tables.insert_batch_data() and Tables.add_to_writer_queue(). Most times the actual length of List passed on to add_to_writer_queue() is not equal to the len(batch) received in the add_to_writer_queue() function. I couldn't find if there is a limit to the overall data passed to a queue object in the documentation. Hence very confused why is the data getting lost and how to ensure all the data gets from Point A to Point B.

1 Answer 1

1

clearing batch also clears the list in the queue before it's processed.

writer_queue.put((query, table_name, batch.copy()))

Always pass immutable data or copies when sharing between threads to avoid side effects.

Sign up to request clarification or add additional context in comments.

2 Comments

The point of passing a copy of list totally skipped my mind. Nice catch. Will update on this shortly.
Hey man. Thanks for helping out. It did work and now all is as expected.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.