1

I am using python to query an external api, transform the data and write it to a postgresql database internally.

In that process, I am comparing the result from the api with existing data in the database using pandas and generate a dataframe that has new records and existing records that have changed in one dataframe.

What I want to do is, to hand over the dataframe or dictionary to sqlalchemy and get it processed in a way that:

  1. new records are just appended
  2. existing records are updated

This is how I approached it (I am a newbie in python, so please be patient with my beginner skills...)

def update_absence(year):
    api_result = get_absence(year)
    db_result = get_database_absence(year)
    df = compare_dataframes(api_result, db_result, 'id')
    metadata_obj = MetaData()
    metadata_obj.reflect(bind=engine)
    some_table = Table("tb_absence", metadata_obj, autoload_with=engine)

    for item in df.to_dict('records'):
        insert_stmt = insert(some_table).values(item).on_conflict_do_update(constraint='tb_absence_pkey', set_=item)
        print(insert_stmt.compile())
        with engine.connect() as conn:
            result = conn.execute(insert_stmt)
            print(result.rowcount)

    conn.commit()

the output for the insert_stmt.compile() is the following:

INSERT INTO tb_absence (id, start_date, end_date, half_day, morning, user_id, employee_id, type, extra_vacation, state, substitute_state, workdays, hours, medical_certificate, comments, substitute_user_id, name) VALUES (%(id)s, %(start_date)s, %(end_date)s, %(half_day)s, %(morning)s, %(user_id)s, %(employee_id)s, %(type)s, %(extra_vacation)s, %(state)s, %(substitute_state)s, %(workdays)s, %(hours)s, %(medical_certificate)s, %(comments)s, %(substitute_user_id)s, %(name)s) ON CONFLICT ON CONSTRAINT tb_absence_pkey DO UPDATE SET id = %(param_1)s, start_date = %(param_2)s, end_date = %(param_3)s, half_day = %(param_4)s, morning = %(param_5)s, user_id = %(param_6)s, employee_id = %(param_7)s, type = %(param_8)s, extra_vacation = %(param_9)s, state = %(param_10)s, substitute_state = %(param_11)s, workdays = %(param_12)s, hours = %(param_13)s, medical_certificate = %(param_14)s, comments = %(param_15)s, substitute_user_id = %(param_16)s, name = %(param_17)s

and the rowcount is 1 for each item that I iterate through (the print statement which will vanish for a real log entry once I have understood the approach). However, the database is never updated. I can't really get my head around the conflict_do_update thing and how to handle the connection and engine.

I guess I have some fundamental issues of understanding it and the examples I find on the sqlalchemy tutorial for that part are hard to comprehend for my, as they only give small snippets and fragments of the solution. I would need a complete working example probably. Also reviewing other questions here on SO do did not lead to a good understanding on my side.

All methods for identifying the differences of the datasets are working fine.

I highly appreciate any hint that could help me progress here.

7
  • You may find this code helpful. Commented Jun 21, 2024 at 12:26
  • Dear Gord, thanks a lot for the suggestion, but this by far surpasses my capabilities and I would rather not like to use it as I can't fully understand/support/change it. Commented Jun 21, 2024 at 13:04
  • You need to commit inside the with engine.connect() as conn: block, because it will roll back on exit. Or use with engine.begin() as conn:, which will commit automatically. Commented Jun 21, 2024 at 13:06
  • Keep in mind that your approach will insert/update row-by-row, which will not be a problem for small batches but will start to get very slow for larger batches. Commented Jun 21, 2024 at 13:16
  • Dear @snakecharmerb, thanks for the hint, which I actually had implemented as you suggested before. Still I do not get the row updated. When I delete the complete row in the db, the update (which is a plain insert then, works) Commented Jun 21, 2024 at 13:24

2 Answers 2

3

By looping through for item in df.to_dict('records'): you are creating and sending a separate INSERT for each row. For example, with my table …

some_table = sa.Table(
    "thing",
    sa.MetaData(),
    sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
    sa.Column("txt", sa.String),
)

… and DataFrame …

df = pd.DataFrame([(1, "txt_1"), (2, "txt_2")], columns=["id", "txt"])

engine.echo = True shows that your code results in

INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
[no key 0.00090s] {'id': 1, 'txt': 'txt_1', 'param_1': 1, 'param_2': 'txt_1'}
INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
[no key 0.00066s] {'id': 2, 'txt': 'txt_2', 'param_1': 2, 'param_2': 'txt_2'}

We can turn that into an "executemany" using textual SQL

sql = sa.text("INSERT INTO thing (id, txt) VALUES (:id, :txt) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = :txt")
with engine.begin() as conn:
    conn.execute(sql, df.to_dict("records"))

which produces

INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = %(txt)s
[generated in 0.00086s] [{'id': 1, 'txt': 'txt_1'}, {'id': 2, 'txt': 'txt_2'}]

Or, SQLAlchemy Core can build the statement for us:

stmt = insert(some_table)
do_update_stmt = stmt.on_conflict_do_update(
    "thing_pkey", set_={excl.name: excl for excl in stmt.excluded}
)

which produces

INSERT INTO thing (id, txt) VALUES (%(id__0)s, %(txt__0)s), (%(id__1)s, %(txt__1)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = excluded.id, txt = excluded.txt
[no key 0.00017s (insertmanyvalues) 1/1 (unordered)] {'id__0': 1, 'txt__0': 'txt_1', 'id__1': 2, 'txt__1': 'txt_2'}
Sign up to request clarification or add additional context in comments.

1 Comment

I understand the general approach, but am still not able to see how this can be put into the "generic" statement like this: insert_stmt = insert(some_table).values(df.to_dict("records")).on_conflict_do_update(constraint='tb_absence_pkey', set_=df.to_dict("records")) with engine.begin() as conn: conn.execute(insert_stmt)
0

The problem was related to the pandas dataframe comparison operation and the wrong keep parameter (last instead of first) in the drop_duplicates operation.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.