0

I am trying to pull a 1.7G file into a pandas dataframe from a Greenplum postgres data source. The psycopg2 driver takes 8 or so minutes to load. Using the pandas "chunksize" parameter does not help as the psycopg2 driver selects all data into memory, then hands it off to pandas, using a lot more than 2G of RAM.

To get around this, I'm trying to use a named cursor, but all the examples I've found then loop through row by row. And that just seems slow. But the main problem appears to that my SQL just stops working in the named query for some unknown reason.

Goals

  • load the data as quickly as possible without doing any "unnatural acts"
  • use SQLAlchemy if possible - used for consistency
  • have the results in a pandas dataframe for fast in-memory processing (alternatives?)
  • Have a "pythonic" (elegant) solution. I'd love to do this with a context manager but haven't gotten that far yet.

    /// Named Cursor Chunky Access Test
    import pandas as pd
    import psycopg2
    import psycopg2.extras
    
    /// Connect to database - works
    conn_chunky = psycopg2.connect(
        database=database, user=username, password=password, host=hostname)
    /// Open named cursor - appears to work
    cursor_chunky = conn_chunky.cursor(
        'buffered_fetch', cursor_factory=psycopg2.extras.DictCursor)
    cursor_chunky.itersize = 100000
    
    /// This is where the problem occurs - the SQL works just fine in all other tests, returns 3.5M records
    result = cursor_chunky.execute(sql_query) 
    /// result returns None (normal behavior) but result is not iterable
    
    df = pd.DataFrame(result.fetchall()) 
    

The pandas call returns AttributeError: 'NoneType' object has no attribute 'fetchall' Failure seems due to named cursor being used. Have tried fetchone, fetchmany, etc. Note the goal here is to let the server chunk and serve up the data in large chunks such that there is a balance of bandwidth and CPU usage. Looping through a df = df.append(row) is just plain fugly.

See related questions (not the same issue):

Added standard client side chunking code per request

nrows = 3652504
size = nrows / 1000
idx = 0
first_loop = True
for dfx in pd.read_sql(iso_cmdb_base, engine, coerce_float=False, chunksize=size):
    if first_loop:
        df = dfx
        first_loop = False
    else:
        df = df.append(dfx,ignore_index=True)

1 Answer 1

2

UPDATE:

#Chunked access
start = time.time()
engine = create_engine(conn_str)
size = 10**4
df = pd.concat((x for x in pd.read_sql(iso_cmdb_base, engine, coerce_float=False, chunksize=size)),
               ignore_index=True)
print('time:', (time.time() - start)/60, 'minutes or ', time.time() - start, 'seconds')

OLD answer:

I'd try to read data from PostgreSQL using internal Pandas method: read_sql():

from sqlalchemy import create_engine
engine = create_engine('postgresql://user@localhost:5432/dbname')

df = pd.read_sql(sql_query, engine)
Sign up to request clarification or add additional context in comments.

9 Comments

That's exactly what I'm doing on my other queries, but if you notice, I create a named cursor, which is neither a connection nor a query. I haven't tried feeding the dataframe a named cursor yet, nor have I found a good example of this.
@Harvey, i don't quite understand why do you need named cursor - how it's going to help?
Yeah, that barfed up a hairball: DatabaseError: Execution failed on sql '<cursor object at 0x0000000008B28588; closed: 0>': argument 1 must be a string or unicode object There were bits everywhere. It was horrible. :)
The named cursor allows server-side chunking of the data. For some reason that escapes me, the 1.7G database (once in pandas) uses ~16GB of RAM. Other drivers such as pg8000 use 13GB RAM and take ~20 minutes. The idea was to get the server chunking and the pandas processing time interleaved such that they are both busy at the same time. But first I have to get it working.
@Harvey, i've updated my answer - can you check whether it helps?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.