I have stored strings & their vector embeddings in a Sqlite DB file with the table name "query_metadata". Embeddings are stored as numpy bytes. The embeddings would be used for calculating cosine similarity between the queries and give back a reranked response based on similarity score.
My service is FastAPI based with 5 worker processes. Each worker process would open a connection to Sqlite file (already existing in disc) during application startup via an AsyncIO scheduler which calls the open_sqlite_conn function with trigger as 'date' & maintain it. The API call repeatedly calls the get_metadata fucntion to get the embeddings of each query. No connection refresh until and unless the worker restarts itself.
The problem is: Whenever I run a perf test on the service, the memory consumption of the application keeps increasing and doesn't come down even after the perf has ended. I'm looking for a way to put a cap on the RAM usage or detect any memory leaks which might be present
class SvcRepository(metaclass=SingletonMeta):
def __init__(self) -> None:
conf = AppConfigParser()
self.file_path = conf.parse_config("datasource.local_file_path")
self.term_metadata_table = "query_metadata"
self.sqlite_conn = None
async def open_sqlite_conn(self):
logger.info(f"Sqlite connection initiation at : {str(datetime.now())}")
try:
if os.path.isfile(self.file_path):
if self.sqlite_conn is None:
self.sqlite_conn = await aiosqlite.connect(f"file:{self.file_path}?mode=ro",uri=True)
async with self.sqlite_conn.cursor() as exec_cursor:
await exec_cursor.execute('PRAGMA cache_size = 0')
await exec_cursor.execute('PRAGMA encoding = "UTF-8"')
await exec_cursor.execute('PRAGMA synchronous=NORMAL')
await exec_cursor.execute('PRAGMA temp_store=FILE')
await exec_cursor.execute('PRAGMA hard_heap_limit=67108864')
logger.info(f"Sqlite connection refreshed at: {str(datetime.now())}")
except Exception as ex:
logging.error(f"Failed refresh sqlite connection {str(ex)}")
logging.error(f"Sqlite connection refresh failed due {traceback.format_exc()}")
return
async def get_metadata(self, queries):
query_metadata = {}
if self.sqlite_conn is not None:
async with self.sqlite_conn.cursor() as exec_cursor:
search_terms = list(set(queries))
if search_terms is not None and len(search_terms) > 0:
query_for_terms_metadata = (f"SELECT query, embedding FROM {self.term_metadata_table} WHERE query IN "
f"({','.join(['?'] * len(search_terms))})")
try:
await exec_cursor.execute(query_for_terms_metadata, search_terms)
term_data = await exec_cursor.fetchall()
if term_data is not None and len(term_data) > 0:
query_metadata = {item[0]: item[1:] for item in term_data}
except Exception as ex:
logging.error(f"Failed to execute query on terms table {str(ex)}")
logging.error(f"Terms table query failed due to {traceback.format_exc()}")
return query_metadata
I ran two types of perf to trim down the issue:
Perf with a set of 30k queries, each request would take 5 queries at random from this pool and hit the API at 100 TPS for 20 minutes. Results: Memory consumption kept increasing during the perf even after putting a cap on the cache size. After the perf ended the memory consumption doesn't come down. If i ran multiple perfs with same parameters the memory consumption would keep increasing from previous high levels.
Perf with a fixed set of 5 queries, each request would hit the API with the same 5 queries at 100 TPS for 20 minutes. Results: Memory consumption remained same. There was no increase as compared to the memory levels before the perf.
The second perf gave me an insight that there was no memory leak in the code, and the increase in memory was due to Sqlite mostly.
I tried putting cache_size & heap_limit pragmas but they didn't work. I have set the cache size to 0 to make every sqlite call disc based & nothing should be stored in cache. But it didn't help.
I'm looking for a way to put a cap on the RAM usage or detect any memory leaks which might be present
Articles referred to: