1

Is there a way to monkeypatch SQLAlchemy in a way that intercepts all INSERT INTO statements generated during a session.flush() without sending them to the database? Instead, I want to capture these SQL statements and their parameters so I can later rewrite them into a single multi-row INSERT per table and execute them manually. The goal is to optimize batch inserts for MySQL using a single SQL network call. My sqlachemy is 1.4

Base = declarative_base()
 
class User(Base):
    __tablename__ = 'users'
    id   = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), nullable=False)
    profiles = relationship(
        'Profile', back_populates='user', cascade='all, delete-orphan'
    )
 
class Profile(Base):
    __tablename__ = 'profiles'
    id      = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    bio     = Column(String(200), nullable=True)
    user    = relationship('User', back_populates='profiles')
 

engine = create_engine(
    'mysql+pymysql://user:pass@host/dbname',
    connect_args={'client_flag': CLIENT.MULTI_STATEMENTS}
)
Session = sessionmaker(bind=engine, autoflush=False)
 

captured_sql = []
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    captured_sql.append((statement, parameters))
    return statement, parameters
 
def run_dry_flush(session, obj):
    # attach listener to this Session's SA Connection
    sa_conn = session.connection()
    event.listen(sa_conn, 'before_cursor_execute', before_cursor_execute, retval=True)
    dbapi_conn = sa_conn.connection
    orig_cursor_factory = dbapi_conn.cursor
    def cursor_factory(*args, **kwargs):
        cur = orig_cursor_factory(*args, **kwargs)
        # wrap execute to no-op after firing SA events
        orig_execute = cur.execute
        def execute_noop(query, params=None):
            # call no-op DBAPI, but SA before_cursor_execute already fired
            return 0
        cur.execute = execute_noop
        cur.executemany = lambda query, params=None: 0
        return cur
    dbapi_conn.cursor = cursor_factory
 
    # perform ORM ops and flush (SQL captured, not sent)
    session.add(obj)
    session.flush()

    dbapi_conn.cursor = orig_cursor_factory
    event.remove(sa_conn, 'before_cursor_execute', before_cursor_execute)
 

session = Session()
alice = User(name='alice')
alice.profiles.extend([
    Profile(bio='Bio 1'),
    Profile(bio='Bio 2'),
    Profile(bio='Bio 3')
])
run_dry_flush(session, alice)

Sqlalchemy doesn't optimize sql statement at all, is there a way to have:

INSERT INTO users (name,id_parent) VALUES (bob,1), (alice,2), (stan,3);

instead of multi statements like here:

INSERT INTO users (name,id_parent) VALUES (bob,1);

INSERT INTO users (name,id_parent) VALUES (alice,2);

I was trying monkeypatching, mocking on dummy db session. I would like to omit creating of raw insert statment and use ORM functionally. Maybe there is a way to capture produced sql statment but not sending them to db, and at the end sort it by table and send only one insert into statment intend of multiple?

1 Answer 1

0

ORM in general and SQLAlchemy in particular was designed in order to provide a consistent object-relational mapping, so high-performance bulk inserts are often not among the main features. This article provides a description of this, first providing the description of raw inserts (which you do not prefer):

The SQLAlchemy ORM uses the unit of work pattern when synchronizing changes to the database. This pattern goes far beyond simple “inserts” of data. It includes that attributes which are assigned on objects are received using an attribute instrumentation system which tracks changes on objects as they are made, includes that all rows inserted are tracked in an identity map which has the effect that for each row SQLAlchemy must retrieve its “last inserted id” if not already given, and also involves that rows to be inserted are scanned and sorted for dependencies as needed. Objects are also subject to a fair degree of bookkeeping in order to keep all of this running, which for a very large number of rows at once can create an inordinate amount of time spent with large data structures, hence it’s best to chunk these.

Basically, unit of work is a large degree of automation in order to automate the task of persisting a complex object graph into a relational database with no explicit persistence code, and this automation has a price.

ORMs are basically not intended for high-performance bulk inserts - this is the whole reason SQLAlchemy offers the Core in addition to the ORM as a first-class component.

For the use case of fast bulk inserts, the SQL generation and execution system that the ORM builds on top of is part of the Core. Using this system directly, we can produce an INSERT that is competitive with using the raw database API directly.

However, an alternative to that which you are probably seeking for is the Bulk Operations suite of methods and the article provides this method for bulk inserts:

def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")

Where we see that dummy records are to be created and bulk-inserted in batches of 1000 by adding them to the session. The full example is:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i + 1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_save_objects(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for chunk in range(0, n, 10000):
        DBSession.bulk_save_objects(
            [
                Customer(name="NAME " + str(i))
                for i in xrange(chunk, min(chunk + 10000, n))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for chunk in range(0, n, 10000):
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(chunk, min(chunk + 10000, n))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_insert_mappings(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_save_objects(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.