1

Requirement: Insert new data and update existing data in bulk (row count > 1000) from a dataframe/CSV (which ever suites) and save it in PostgreSQL database.

Table: TEST_TABLE

CREATE TABLE TEST_TABLE (
itemid varchar(100)  NOT NULL PRIMARY KEY,
title varchar(255),
street varchar(10),
pincode VARCHAR(100));

INSERT: ['756252', 'tom title', 'APC Road', '598733' ], 
        ['75623', 'dick title', 'Bush Road', '598787' ], 
        ['756211', 'harry title', 'Obama Street', '598733' ]

dataframe content:

data = [['756252', 'tom new title', 'Unknown Road', 'pin changed' ], 
        ['75623', 'dick new title', 'Bush Road changed', '598787 also changed' ], 
        ['756211', 'harry title', 'Obama Street', '598733'],
        ['7562876', 'new1 data title', 'A Street', '598730'],
        ['7562345', 'new2 data title', 'B Street', '598731'],
        ['7562534', 'new3 data title', 'C Street', '598732'],
        ['7562089', 'new4 data title', 'D Street', '598733']] 

df = pd.DataFrame(data, columns = ['itemid', 'title', 'street', 'pincode']) 

I want to UPDATE the records with same itemid and INSERT the new records. The data will be huge (CSV file created from the dataframe is more than 50MB).

Programming language used: Python

Database: PostgreSQL

2
  • 1
    Create a temporary table, use COPY to move the data, do an INSERT ... ON CONFLICT from the temporary table to the actual table. Commented Apr 6, 2020 at 17:15
  • Any suggestion on how to deal with bulk data? It will be very costly operation if it is done each row at a time in a loop Commented Apr 7, 2020 at 0:20

1 Answer 1

9

In this particular case it is better to drop down to DB-API level, because you need some tools that are not exposed even by SQLAlchemy Core directly, such as copy_expert(). That can be done using raw_connection(). If your source data is a CSV file, you do not need pandas in this case at all. Start by creating a temporary staging table, copy data to the temp table, and insert to the destination table with conflict handling:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

If on the other hand your source data is the DataFrame, you can still use COPY by passing a function as method= to to_sql(). The function could even hide all the above logic:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

You would then insert the new DataFrame using

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

Using this method upserting ~1,000,000 rows took about 16s on this machine with a local database.

Sign up to request clarification or add additional context in comments.

4 Comments

What does the ON CONFLICT , DO UPDATE SET, EXCLUDED. do? So that will UPDATE any duplicate id rows on the TEST_TABLE with the values from the TEST_STAGING table? Thanks and good job.
Explaining them in the space of a comment would be challenging, so I will refer you to postgresql.org/docs/current/sql-insert.html, but you got the gist of it already.
For postgresql, TEST_STAGING needs to be quoted as "TEST_STAGING"
It does not need quoting, if you consistently use regular identifiers, as is done here in the first part. If you or a library decide to use delimited identifiers (quoted), then yes.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.