5

I am trying to create an async connection using psycopg3. I was using psycopg2 without async and need to move to async database functions. The docs do not give much information.

So this is what I was using with psycopg2. It worked good.

con = psycopg2.connect(host="HOSTNAME", port="PORT", database=("DATABASE", user="USER", password="PASSWORD")
cursor = con.cursor()

Then when I needed to run a query I would just use

cursor.execute(query, params)
cursor.fetchall() # or con.commit() depending on insert or select statement.

Now that I am moving to async functions, I have tried this

con = await psycopg.AsyncConnection.connect(host="HOSTNAME", port="PORT", database="DATABASE", user="USER", password="PASSWORD")
cursor = await con.cursor()

But I get the error that I cannot use await outside of a function.

The docs tell me to do this

async with await psycopg.AsyncConnection.connect() as aconn:
    async with aconn.cursor() as cur:
        await cur.execute(...)

So do I need to write this in every function that I want to either read or write records with?

Couple examples in my code using psycopg2 currently

async def check_guild(guild_id):
    cursor.execute("SELECT guild_id, guild_name, su_id FROM guild WHERE guild_id = %s", [guild_id])
    guild = cursor.fetchone()
    return guild

async def config_raffle(guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee):
    try:
        cursor.execute("""INSERT INTO raffle_config (guild_id, channel_id, channel_name, channel_cat_id, token, default_token, default_address, su_id, fee) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (guild_id) DO UPDATE SET channel_id = EXCLUDED.channel_id, channel_name = EXCLUDED.channel_name, channel_cat_id = EXCLUDED.channel_cat_id, token = EXCLUDED.token,
        default_token = EXCLUDED.default_token, default_address = EXCLUDED.default_address, su_id = EXCLUDED.su_id, fee = EXCLUDED.fee""",
                       (guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee))
        con.commit()
    except:
        logging.exception("Exception", exc_info=True)
        con.rollback()
        print("Error: 25")
    return True

So I am thinking maybe my better option is to use the AsyncConnectionPool. I have a db.py file setup like this:

import psycopg_pool
import os
import dotenv

dotenv.load_dotenv()

conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)


async def open_pool():
    await pool.open()

I open the pool when my program runs the on_ready function. I created new tables this way just fine, but when I try to retrieve records I get this error.

discord.ext.commands.errors.CommandInvokeError: Command raised an exception: AttributeError: 'AsyncConnection' object has no attribute 'fetchone'
4
  • 1) From the docs Async: which can be condensed into async with await: …but no less than that: you still need to do the double async thing. 2) Why do you need to move to async? Commented Jan 9, 2023 at 19:42
  • Its for a discord bot and im running into blocking issues when there is a large database write. Commented Jan 9, 2023 at 20:55
  • Then you will need to either 1) Follow the form for using async as shown in the psycopg(3) docs or 2) Break the write down into smaller chunks Commented Jan 9, 2023 at 21:31
  • Please add the code to question properly formatted. Commented Jan 9, 2023 at 22:56

1 Answer 1

10

Ended up sorting this out this way:

import psycopg_pool
import os
import dotenv

dotenv.load_dotenv()

conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)


async def open_pool():
    await pool.open()
    await pool.wait()
    print("Connection Pool Opened")

async def select_fetchall(query, args):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, args)
            results = await cursor.fetchall()
            return results


async def write(query, args):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, args)
            if 'RETURNING' in query:
                results = await cursor.fetchone()
                return results
            else:
                return

Then I just call the functions when I need to read or write to the database and pass the query and args.

Sign up to request clarification or add additional context in comments.

1 Comment

Hey @rolandj, How is the above design working for you? is it truly async and non blocking when there is a load?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.