1

How to ensure each request has it's own db.session in flask-sqlalchemy app using celery and postgresql and being run by gunicorn?

See below the errors I am getting, the code I am using, and the logs showing the same session being shared across requests. I removed some of the error handling and other code to make it more concise. What am I doing wrong or what else do I need to do? Thanks!

Software Versions

  • flask 2.3.3
  • flask-sqlalchemy 3.0.5
  • sqlalchemy 1.4.49
  • celery 5.4.0
  • gunicorn 23.0.0
  • python 3.11.9
  • (bitnami) postgresql 14.17
  • docker 27.3.1
  • debian 12.8

Errors

In Postgresql

WARNING:  there is already a transaction in progress
WARNING:  there is no transaction in progress

In SQLAlchemy

sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

Code

In run.py

@app.before_request
def get_user():
    pid = os.getpid()
    tid = threading.get_ident()
    print(f"🔍 {pid=} {tid=} Request: {request.path} db.session ID: {id(db.session)} {session=} {session.info=}")
    db.session.rollback()  # To clear any stale transaction.
    try:
        current_user = db.session.query(User).filter_by(public_id=public_id).first()
    except Exception as e:
        db.session.rollback()
    try:
        current_user.interactions += 1
        db.session.commit()
    except Exception as e:
        db.session.rollback()
    g.current_user = current_user

@app.teardown_appcontext
def shutdown_session(exception=None):
    db.session.remove() # Clean up at the end of the request.

In gunicorn_config.py

# Ensure each worker creates a fresh SQLAlchemy database connection.
def post_fork(server, worker):
    app = create_app()
    with app.app_context():
        db.session.remove()
        db.engine.dispose()

# Reset database connections when a worker is exiting.
def worker_exit(server, worker):
    app = create_app()
    with app.app_context():
        db.session.remove()
        db.engine.dispose()

preload_app = True  # Loads the application before forking workers.
workers = multiprocessing.cpu_count() * 2 + 1
threads = 4
worker_exit = worker_exit
worker_class = "gthread"
keepalive = 4  # seconds
timeout = 60  # seconds
graceful_timeout = 30  # seconds
daemon = False
post_fork = post_fork
max_requests = 1000  # Restart workers after handling 1000 requests (prevents memory leaks).
max_requests_jitter = 50  # Adds randomness to avoid all workers restarting simultaneously.
limit_request_line = 4094
limit_request_field_size = 8190
bind = "0.0.0.0:5555"
backlog = 2048
accesslog = "-"
errorlog = "-"
loglevel = "debug"
capture_output = True
enable_stdio_inheritance = True
proc_name = "myapp_api"
forwarded_allow_ips = '*'
secure_scheme_headers = { 'X-Forwarded-Proto': 'https' }
certfile = os.environ.get('GUNICORN_CERTFILE', 'cert/self_signed_backend.crt')
keyfile = os.environ.get('GUNICORN_KEYFILE', 'cert/self_signed_backend.key')
ca_certs = '/etc/ssl/certs/ca-certificates.crt'

In Celery myapp/tasks.py

@shared_task()
def do_something() -> None:
    with current_app.app_context():
        Session = sessionmaker(bind=db.engine)
        session = Session()
        try:
            # Do something with the database.
        finally:
            session.close()

In myapp/extensions.py

from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

In myapp/__init__.py

def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_object(ConfigDefault)
    db.init_app(app)

In myapp/config.py

class ConfigDefault:
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = (
        f"postgresql+psycopg2://{SQL_USER}:{SQL_PASSWORD}@{SQL_HOST}:{SQL_PORT}/{SQL_DATABASE}"
    )
    SQLALCHEMY_ENGINE_OPTIONS = {
        "pool_pre_ping": True,    # Ensures connections are alive before using
        "pool_recycle": 1800,     # Recycle connections after 30 minutes
        "pool_size": 10,          # Number of persistent connections in the pool
        "max_overflow": 20,       # Allow temporary connections beyond pool_size
        "pool_timeout": 30,       # Wait time in seconds before raising connection timeout

Logs

Showing same thread id and session id for all requests:

🔍 pid=38 tid=139541851670208 Request: /v1/user/signup db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=34 tid=139541851670208 Request: /v1/user/login db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=34 tid=139541851670208 Request: /v1/user db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=34 tid=139541851670208 Request: /v1/dependent db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=34 tid=139541851670208 Request: /v1/mw/settings db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=36 tid=139541851670208 Request: /v1/mw/settings db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=40 tid=139541851670208 Request: /v1/mw/settings db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=33 tid=139541851670208 Request: /v1/user db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=40 tid=139541851670208 Request: /v1/user db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=33 tid=139541851670208 Request: /v1/mw/settings db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=38 tid=139541851670208 Request: /v1/mw/settings db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=40 tid=139541851670208 Request: /v1/mw/settings db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=38 tid=139541851670208 Request: /v1/user db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=36 tid=139541851670208 Request: /v1/user db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=38 tid=139541851670208 Request: /v1/a/v db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=36 tid=139541851670208 Request: /v1/a/v db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=34 tid=139541851670208 Request: /v1/p/lt db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=36 tid=139541851670208 Request: /v1/p/l db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=38 tid=139541851670208 Request: /v1/p/l db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=33 tid=139541851670208 Request: /v1/t/t db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=34 tid=139541851670208 Request: /v1/t/t db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
🔍 pid=38 tid=139541851670208 Request: /v1/t/t db.session ID: 139542154775568 db.session=<sqlalchemy.orm.scoping.scoped_session object at 0x7ee9b0910c10> db.session.info={}
ERROR:myapp_api:Exception on /v1/mw/settings [PATCH]
sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq
'🔍 pid=38 tid=139541851670208 session_id=139542154775568 'INFO:sqlalchemy.engine.Engine:ROLLBACK

1 Answer 1

0

For Celery worker db sessions I used this in myapp/__init__.py:

def celery_init_app(app: Flask) -> Celery:
    class FlaskDatabaseTask(Task):
        def __init__(self):
            self.sessions = {}

        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return super().__call__(*args, **kwargs)

        def before_start(self, task_id, args, kwargs):
            with app.app_context():
                CelerySession = sessionmaker(bind=db.engine)
                session = CelerySession()
                self.sessions[task_id] = session
            super().before_start(task_id, args, kwargs)

        def after_return(self, status, retval, task_id, args, kwargs, einfo):
            session = self.sessions.pop(task_id)
            session.close()
            super().after_return(status, retval, task_id, args, kwargs, einfo)

        @property
        def session(self):
            return self.sessions[self.request.id]

    celery_app = Celery(app.name, task_cls=FlaskDatabaseTask)
    # [...]

For Flask REST API request db sessions I used this in run.py:

@app.before_request
def get_user():
    SessionFactory = sessionmaker(bind=db.engine)
    g.FlaskSession = scoped_session(SessionFactory)
    g.db_session = g.FlaskSession()
    # [...]

# Using @app.teardown_request instead of @app.teardown_appcontext because this is only for flask http request sessions.
# Celery task sessions are handled in FlaskDatabaseTask.
@app.teardown_request
def shutdown_session(exception=None):
    db_session = getattr(g, "db_session", None)
    if db_session is not None:
        db_session.close()
    FlaskSession = getattr(g, "FlaskSession", None)
    if FlaskSession is not None:
        FlaskSession.remove()

It's not perfect but it's working much better.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.