0

Hi we were trying to parallelize a huge select by chopping it to smaller selects. The dataset had a "segment" column and for this reason we used it as a way to partition the select. Our target was a PosgreSQL database. Unfortunately we did not observe a performance benefit, in other words performance increase linear to the threads we used.

We were able to isolate our observations to a synthetic test case. We simulate the multiple fetches (11) as each one fetching from a generate_series query.

We used 1 connection and each one run sequentially or 11 connections to run in parallel.

We observed no performance benefit.

Instead if we just simulated the fetch as 1 row fetch that blocked for 5 sec (QUERY1) we had the expected performance benefit.

The main code we use to parallelize.

def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data

def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result

with direct fetching from CSVs we also saw the same performance benefit.

the theory is that sockets/threads/big data fetches in Python do not play well.

Is this correct? Are we making something wrong.

Test on Big Sur x64, Python 3.9.6, Postgresql 13, rest of the code attached

our docker-compose file

version: '2'

services:

  database:
    container_name:
      posgres
    image: 'docker.io/bitnami/postgresql:latest'
    ports:
      - '5432:5432'
    volumes:
      - 'postgresql_data:/bitnami/postgresql'
    environment:
      - POSTGRESQL_USERNAME=my_user
      - POSTGRESQL_PASSWORD=password123
      - POSTGRESQL_DATABASE=mn_dataset
    networks:
      - pgtapper

volumes:
  postgresql_data:
    driver: local

networks:
  pgtapper:
    driver: bridge

the config.py file

TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'

PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'


# PORT=15433
# DBNAME='newron'
# USER='flyway'
# PASSWORD='8P87PE8HKuvjQaAP'


CONNECT_TIMEOUT=600

QUERY1 = '''
select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from (select pg_sleep(5)) xxx
'''

QUERY3 = '''
 select * from t1 LIMIT 10000
'''

QUERY2 = '''
 select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from generate_series(1, 10000)
'''


MYSQL_QUERY = '''
select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from t1
limit 10000
'''

and our full example


# This is a sample Python script.

# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import itertools

from psycopg2.pool import ThreadedConnectionPool

from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd

def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data

def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result

# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    connection_pool = ThreadedConnectionPool(
        minconn=config.TASKS,
        maxconn=config.TASKS,
        host=config.HOST,
        port=config.PORT,
        dbname=config.DBNAME,
        user=config.USER,
        password=config.PASSWORD,
        connect_timeout=config.CONNECT_TIMEOUT
    )
    get_sales(connection_pool)

# See PyCharm help at https://www.jetbrains.com/help/pycharm/


5
  • try using psycopgs2.extras.execute_batch, the issue here is that separating your queries won't improve performance since it increases the server roundtrip time. The database won't fetch the results faster if it being asked multiple times. Commented Jul 8, 2021 at 12:19
  • The execute batch serializes and the documentation says it is good alternative to executemany which is mostly for updating. I'm not updating. I am fetching above. Commented Jul 8, 2021 at 12:27
  • It is the same amount of data that has to be fetched from disk, and the same amount of data to be sent over the network. Why do you think threading could help? (except , maybe, at the client side) Commented Jul 8, 2021 at 12:56
  • The client side is my requirement. I have a dataset, each row has a segment field in a set of 11 possible values. Instead of fetching the whole dattaaset, spawn 11 threads to fetch parts of dataset that correspond to the respective segment and compose the final result at client. I have posted my github below, you can clone and run raw_main.py from there (you need to start docker-compose). Commented Jul 8, 2021 at 14:38
  • This library may help: pola-rs.github.io/polars/py-polars/html/reference/api/… Commented Jun 24, 2022 at 9:15

2 Answers 2

1

After some tests with psycopg3 and asyncpg I settled on the connector-x library. Having a PostgreSQL table with three columns (time:timestamp, variable:text and value:double) and 3 million rows, the following code returns data in pandas format in 2 seconds:

from time import time
import connectorx as cx

print("Fetch start...")
start = time()
df = cx.read_sql(
    "postgresql://postgres:password@localhost:5432/test",
    "SELECT * FROM variables")
end = time()

print(f"Fetched {len(df.index)} records in {end - start} ")

With partitioning/parallelizing to 4 threads in 1.2 seconds:

df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
    "SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
])

and the same code returns data in arrow2 format in sub second:

df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
    "SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
],
return_type="arrow2")
Sign up to request clarification or add additional context in comments.

2 Comments

Very interesting answer. Haven't heard about that. I need to run the tests myself. What version of Postgres though? Is arrow happening in Postgres?
I used TimescaleDB. But before data transformation to column-based storage. TimescaleDB also automatically partitions data by timestamps. But you can partition table data in Postgres too. Also fetching data in Arrow format with subsequent transformation to pandas in your app saves a couple of hundred milliseconds too :)
1

With your generate_series query, almost all the time is being spent in python reading and processing the data, and nearly none is spent in PostgreSQL computing and sending it.

It looks like ThreadedConnectionPool something (maybe the global interpreter lock) coordinates access to your database connections (using futex) so that only one can be "active" in python (across all the threads) at a time. So while many queries can be running on the database at the same time, that doesn't help you because almost no time is actually spent on that.

5 Comments

If your theory is correct then QUERY1 should also serialize. But it does not. See here github.com/fithisux/multithreadesqlfetch and run the experiments. Can you post working code or a repo that solves the problem?
No, with QUERY1 all the time is spent in PostgreSQL, not in python, and so doesn't serialize. It is sending the query and reading the result which serialize, and virtually no time is spent doing those things with QUERY1, so the time spent serialized is negligible.
There is another script cc_main.py that has exctly the same behavior without ThreadedConnectionPool. Also if in the original I replace with SimpleConnectionPool I have the same behavior. Do you have a script that solves (or almost solves) the problem?
See my correction. You said you do see a benefit with csv, but I don't see an example of that in your git repo. If it faster to start with and also shows a benefit from parallel, then clearly that is the way to go.
I upvoted. The CSV usage is indeed faster. Possibly I have this as a comment. But the problem I try to solve unfortunately is not working acceptably with CSV.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.