22

The PostgreSQL ON CONFLICT clause in INSERT statements provides "upsert" functionality (i.e. update an existing record, or insert a new one if no such record exists). This functionality is supported in SQLAlchemy via the on_conflict_do_nothing and on_conflict_do_update methods on the PostgreSQL dialect's Insert object (as described here):

from sqlalchemy.dialects.postgresql import insert

insert_stmt = insert(my_table).values(
    id='some_existing_id',
    data='inserted value'
)

do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
    index_elements=['id']
)

conn.execute(do_nothing_stmt)

do_update_stmt = insert_stmt.on_conflict_do_update(
    constraint='pk_my_table',
    set_=dict(data='updated value')
)

conn.execute(do_update_stmt)

I am using flask_sqlalchemy, which manages SQLAlchemy's engine, session, and connections for you. To add an element to the database, I create an instance of my model, add it to the database session, and then call commit, something like this:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy(app)

class MyTable(db.Model):
    id = db.Column(UUID, primary_key=True)
    data = db.Column(db.String)

relation = MyTable(id=1, data='foo')
db.session.add(relation)
db.session.commit()

So the Insert object is completely wrapped and obscured by flask_sqlalchemy.

How can I access the PostgreSQL-specific dialect methods to perform an upsert? Do I need to bypass flask_sqlalchemy and create my own session? If I do this, how can I ensure no conflicts?

2 Answers 2

27

It turns out you can execute a lower-level statement on the db.session. So a solution looks something like this:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.dialects.postgresql import insert as pg_insert

app = Flask(__name__)
db = SQLAlchemy(app)

class MyTable(db.Model):
    id = db.Column(UUID, primary_key=True)
    data = db.Column(db.String)

    def __init__(self, _id, *args, **kwargs):
        self.id = _id
        self.data = kwargs['data']

    def as_dict(self):
        return {'id': self.id, 'data': self.data}

    def props_dict(self):
        d = self.as_dict()
        d.pop('id')
        return d

relation = MyTable(id=1, data='foo')
statement = pg_insert(MyTable)\.
    values(**relation.as_dict()).\
    on_conflict_do_update(constraint='id',
                          set_=relation.props_dict())

db.session.execute(statement)
db.session.commit()

The as_dict() and props_dict() methods in my model class allow me to use the constructor to filter out unwanted properties from the incoming HTTP request.

Sign up to request clarification or add additional context in comments.

4 Comments

This fails for me with the error that constraint "id" for table "foobar" does not exist, even though id is created as a db.Column(db.Integer, primary_key=True) field.
This helped me a lot. Worked well for my need to run on_conflict_do_nothing() with some small changes to your code example.
@deed02392 I'm having the same error, did you find a solution?
Hi @avocado, I must have done in the end, but I guess it was something obvious hence I didn't update here again. We can have a chat if you can't figure it out.
2

An alternative approach using compilation extension (https://docs.sqlalchemy.org/en/13/core/compiler.html):

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
    """
    converts every SQL insert to an upsert  i.e;
    INSERT INTO test (foo, bar) VALUES (1, 'a')
    becomes:
    INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
    (assuming foo is a primary key)
    :param insert_stmt: Original insert statement
    :param compiler: SQL Compiler
    :param kwargs: optional arguments
    :return: upsert statement
    """
    pk = insert_stmt.table.primary_key
    insert = compiler.visit_insert(insert_stmt, **kwargs)
    ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
    updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
    upsert = ' '.join((insert, ondup, updates))
    return upsert

This should ensure that all insert statements behave as upserts.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.