5

I want to speed up one of my tasks and I wrote a little program:

import psycopg2 
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def write_sim_to_db(all_ids2):
    if all_ids1[i] != all_ids2:
        c.execute("""SELECT count(*) FROM similarity WHERE prod_id1 = %s AND prod_id2 = %s""", (all_ids1[i], all_ids2,))
        count = c.fetchone()
        if count[0] == 0:
            sim_sum = random.random()
            c.execute("""INSERT INTO similarity(prod_id1, prod_id2, sim_sum) 
                    VALUES(%s, %s, %s)""", (all_ids1[i], all_ids2, sim_sum,))
            conn.commit()

conn = psycopg2.connect("dbname='db' user='user' host='localhost' password='pass'")
c = conn.cursor()

all_ids1 = list(n for n in range(1000))
all_ids2_list = list(n for n in range(1000))

for i in range(len(all_ids1)):
    with ThreadPoolExecutor(max_workers=5) as pool:
        results = [pool.submit(write_sim_to_db, i) for i in all_ids2_list]

For a while, the program is working correctly. But then I get an error:

Segmentation fault (core dumped)

Or

*** Error in `python3': double free or corruption (out): 0x00007fe574002270 ***
Aborted (core dumped)

If I run this program in one thread, it works great.

with ThreadPoolExecutor(max_workers=1) as pool:

Postgresql seems no time to process the transaction. But I'm not sure. In the log file any mistakes there.

I do not know how to find the error. Help.

2
  • Your approach to the performance problem is very wrong. The threading here is not just a distraction. It adds complexity and give nothing back. Commented Jan 15, 2016 at 17:55
  • @ClodoaldoNeto What does it matter? If you receive this error in production, wherever you were looking for a problem? Commented Jan 15, 2016 at 18:11

2 Answers 2

5

I had to use connection pool.

import psycopg2 
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from psycopg2.pool import ThreadedConnectionPool

def write_sim_to_db(all_ids2):
    if all_ids1[i] != all_ids2:
        conn = tcp.getconn()
        c = conn.cursor()
        c.execute("""SELECT count(*) FROM similarity WHERE prod_id1 = %s AND prod_id2 = %s""", (all_ids1[i], all_ids2,))
        count = c.fetchone()
        if count[0] == 0:
            sim_sum = random.random()
            c.execute("""INSERT INTO similarity(prod_id1, prod_id2, sim_sum) 
                    VALUES(%s, %s, %s)""", (all_ids1[i], all_ids2, sim_sum,))
            conn.commit()
        tcp.putconn(conn)

DSN = "postgresql://user:pass@localhost/db"
tcp = ThreadedConnectionPool(1, 10, DSN)

all_ids1 = list(n for n in range(1000))
all_ids2_list = list(n for n in range(1000))

for i in range(len(all_ids1)):
    with ThreadPoolExecutor(max_workers=2) as pool:
        results = [pool.submit(write_sim_to_db, i) for i in all_ids2_list]
Sign up to request clarification or add additional context in comments.

Comments

0

This is the sane approach to speed it up. It will be much faster and simpler than your code.

tuple_list = []
for p1 in range(3):
    for p2 in range(3):
        if p1 == p2: continue
        tuple_list.append((p1,p2,random.random()))

insert = """
    insert into similarity (prod_id1, prod_id2, sim_sum)
    select prod_id1, prod_id2, i.sim_sum
    from
        (values
            {}
        ) i (prod_id1, prod_id2, sim_sum)
        left join
        similarity s using (prod_id1, prod_id2)
    where s is null
""".format(',\n            '.join(['%s'] * len(tuple_list)))

print cur.mogrify(insert, tuple_list)
cur.execute(insert, tuple_list)

Output:

insert into similarity (prod_id1, prod_id2, sim_sum)
select prod_id1, prod_id2, i.sim_sum
from
    (values
        (0, 1, 0.7316830646236253),
        (0, 2, 0.36642199082207805),
        (1, 0, 0.9830936499726003),
        (1, 2, 0.1401200246162232),
        (2, 0, 0.9921581283868096),
        (2, 1, 0.47250175432277497)
    ) i (prod_id1, prod_id2, sim_sum)
    left join
    similarity s using (prod_id1, prod_id2)
where s is null

BTW there is no need for Python at all. It can all be done in a plain SQL query.

1 Comment

Thank you for your response)) But I need to learn how to run a program that works with Pg in multiple threads. Thank you again.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.