Hi we were trying to parallelize a huge select by chopping it to smaller selects. The dataset had a "segment" column and for this reason we used it as a way to partition the select. Our target was a PosgreSQL database. Unfortunately we did not observe a performance benefit, in other words performance increase linear to the threads we used.
We were able to isolate our observations to a synthetic test case. We simulate the multiple fetches (11) as each one fetching from a generate_series query.
We used 1 connection and each one run sequentially or 11 connections to run in parallel.
We observed no performance benefit.
Instead if we just simulated the fetch as 1 row fetch that blocked for 5 sec (QUERY1) we had the expected performance benefit.
The main code we use to parallelize.
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
print(f"TASK is {segment}")
sql_query = config.QUERY2
with the_conn_pool.getconn() as conn:
conn.set_session(readonly=True, autocommit=True)
start = default_timer()
with conn.cursor() as curs:
curs.execute(sql_query)
data = curs.fetchall()
end = default_timer()
print(f'DB to retrieve {segment} took : {end - start:.5f}')
the_conn_pool.putconn(conn)
return data
def get_sales(the_conn_pool) -> pd.DataFrame:
tasks : Dict = {}
start = default_timer()
with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
for segment in range(0, config.SEGMENTS_NO):
task = executor.submit(pandas_per_segment,
the_conn_pool = the_conn_pool,
segment=segment)
tasks[task] = segment
end = default_timer()
print(f'Consumed : {end-start:.5f}')
start = default_timer()
master_list = [task.result() or task in tasks]
result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
end = default_timer()
print(f'Chained : {end - start:.5f}')
return result
with direct fetching from CSVs we also saw the same performance benefit.
the theory is that sockets/threads/big data fetches in Python do not play well.
Is this correct? Are we making something wrong.
Test on Big Sur x64, Python 3.9.6, Postgresql 13, rest of the code attached
our docker-compose file
version: '2'
services:
database:
container_name:
posgres
image: 'docker.io/bitnami/postgresql:latest'
ports:
- '5432:5432'
volumes:
- 'postgresql_data:/bitnami/postgresql'
environment:
- POSTGRESQL_USERNAME=my_user
- POSTGRESQL_PASSWORD=password123
- POSTGRESQL_DATABASE=mn_dataset
networks:
- pgtapper
volumes:
postgresql_data:
driver: local
networks:
pgtapper:
driver: bridge
the config.py file
TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'
PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'
# PORT=15433
# DBNAME='newron'
# USER='flyway'
# PASSWORD='8P87PE8HKuvjQaAP'
CONNECT_TIMEOUT=600
QUERY1 = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from (select pg_sleep(5)) xxx
'''
QUERY3 = '''
select * from t1 LIMIT 10000
'''
QUERY2 = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from generate_series(1, 10000)
'''
MYSQL_QUERY = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from t1
limit 10000
'''
and our full example
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import itertools
from psycopg2.pool import ThreadedConnectionPool
from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
print(f"TASK is {segment}")
sql_query = config.QUERY2
with the_conn_pool.getconn() as conn:
conn.set_session(readonly=True, autocommit=True)
start = default_timer()
with conn.cursor() as curs:
curs.execute(sql_query)
data = curs.fetchall()
end = default_timer()
print(f'DB to retrieve {segment} took : {end - start:.5f}')
the_conn_pool.putconn(conn)
return data
def get_sales(the_conn_pool) -> pd.DataFrame:
tasks : Dict = {}
start = default_timer()
with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
for segment in range(0, config.SEGMENTS_NO):
task = executor.submit(pandas_per_segment,
the_conn_pool = the_conn_pool,
segment=segment)
tasks[task] = segment
end = default_timer()
print(f'Consumed : {end-start:.5f}')
start = default_timer()
master_list = [task.result() or task in tasks]
result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
end = default_timer()
print(f'Chained : {end - start:.5f}')
return result
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
connection_pool = ThreadedConnectionPool(
minconn=config.TASKS,
maxconn=config.TASKS,
host=config.HOST,
port=config.PORT,
dbname=config.DBNAME,
user=config.USER,
password=config.PASSWORD,
connect_timeout=config.CONNECT_TIMEOUT
)
get_sales(connection_pool)
# See PyCharm help at https://www.jetbrains.com/help/pycharm/