3

I am trying to create a datafrmae from fixedwidth file and load into postgresql database. My input file is very huge (~16GB) and 20Million records. So if i create dataframe it is consuming most of the available RAM. It is taking long time to complete. So i thought of using chunksize(using python generator) option and commit records to table. But it is failing with 'AttributeError: 'generator' object has no attribute 'to_sql' error.

Inspired by this answer here https://stackoverflow.com/a/47257676/2799214

input file: test_file.txt

XOXOXOXOXOXO9
AOAOAOAOAOAO8
BOBOBOBOBOBO7
COCOCOCOCOCO6
DODODODODODO5
EOEOEOEOEOEO4
FOFOFOFOFOFO3
GOGOGOGOGOGO2
HOHOHOHOHOHO1

sample.py

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    for chunk in pd.read_fwf(filename, colspecs=[[0,12],[12,13]],index_col=False,header=None, iterator=True, chunksize=chunk_size):
        yield (chunk)

def _generator( engine, filename, header=False,chunk_size = 10 ** 5):
    chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
    yield row

if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://ABCD:ABCD@ip:port/database')
    c = engine.connect()
    conn = c.connection
    generator = _generator(engine=engine, filename=filename)
    while True:
       print(next(generator))
    conn.close()

Error:

    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
AttributeError: 'generator' object has no attribute 'to_sql'

My Primary goal is to improve performance. Please help me in resolving the issue or please suggest better approach. Thanks in advance.

3
  • 1
    chunck_generator is a generator object which do not have the method to_sql(). You may need to use current_chunk = next(chunk) to get the chunk. Also, row is not define. Commented May 1, 2018 at 15:58
  • 1
    @TwistedSim yes i agree. Is there anyway i can resolve this issue. I should retain dataframe properties. Commented May 1, 2018 at 16:00
  • Where are you expecting this to_sql method to be defined? Certainly not on all generators, or all iterables, or the specific generator you created by just yielding values from a function? If you want too call a method of a DataFrame, you have to call it on a DataFrame, not on some other kind of object. Commented May 1, 2018 at 16:01

3 Answers 3

4

'chunck_generator' will return a 'generator' object not an actual element of the chunk. You need to iterate the object to get the chunk out of it.

>>> def my_generator(x):
...     for y in range(x):
...         yield y
...
>>> g = my_generator(10)
>>> print g.__class__
<type 'generator'>
>>> ele = next(g, None)
>>> print ele
0
>>> ele = next(g, None)
>>> print ele
1

So to fix your code you just need to either loop over the generator

for chunk in chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    yield chunk.to_sql()

But it seems convoluted. I would just do this:

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def sql_generator(engine, filename, header=False,chunk_size = 10 ** 5):
    frame = pd.read_fwf(
        filename, 
        colspecs=[[0,12],[12,13]],
        index_col=False,
        header=None, 
        iterator=True, 
        chunksize=chunk_size
    ):
   
    for chunk in frame:
        yield chunk.to_sql(
            'sample_table', 
            engine, 
            if_exists='replace', 
            schema='sample_schema', 
            index=False
        )


if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://USEE:PWD@IP:PORT/DB')
    for sql in sql_generator(engine, filename):
        print sql
Sign up to request clarification or add additional context in comments.

5 Comments

Code is working. Can i release each chunk memory using del[df] or grabage collector?
It should garbage collect on its own.
Code is working fine but to_sql is running very slow. it took 30mins to insert 100k records ( 100k rows , 98 columns all text type columns). Any insights?
you dont want to chunk -- you want to probably use a database loading utility that can turn off transactions. Postgres seems to provide a COPY function to make large file loading a little more efficient. dba.stackexchange.com/questions/151930/…
Thanks. I found a way to load 9.8million records. I have usedpart of your code and psycopg2 package to load the data. Data is loaded in 30mins
1

Conclusion: to_sql method is not efficient to load large files. So i used copy_from method in package psycopg2 and used chunksize option while creating dataframe. Loaded 9.8 Million records(~17GB) with 98 columns each in 30mins.

I have removed original refrences of my actual file ( iam using sample file in the original post).

import pandas as pd
import psycopg2
import io

def sql_generator(cur,con, filename, boundries, col_names, header=False,chunk_size = 2000000):
    frame = pd.read_fwf(filename,colspecs=boundries,index_col=False,header=None,iterator=True,chunksize=chunk_size,names=col_names)
    for chunk in frame:
        output = io.StringIO()
        chunk.to_csv(output, sep='|', quoting=3, escapechar='\\' , index=False, header=False,encoding='utf-8')
        output.seek(0)
        cur.copy_from(output, 'sample_schema.sample_table', null="",sep="|")
        yield con.commit()

if __name__ == "__main__":
    boundries = [[0,12],[12,13]]
    col_names = ['col1','col2']
    filename = r'test_file.txt'  #Refer to sample file in the original post
    con = psycopg2.connect(database='database',user='username', password='pwd', host='ip', port='port')
    cur = con.cursor()
    for sql in sql_generator(cur,con, filename, boundries, col_names):
        print(sql)
    con.close()

Comments

0

I suggested you something like:

def _generator( engine, filename, ...):
    for chunk in pd.read_fwf(filename, ...):
        yield chunk.to_sql('sample_table', engine, ...)  # not sure about this since row was not define

for row in _generator(engine=engine, filename=filename)
    print(row)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.