avatarTomas Svojanovsky

Summary

The website content provides a comprehensive guide on using asyncpg to connect to and interact with a Postgres database via Supabase, emphasizing asynchronous query execution and connection pooling for efficient database operations in Python.

Abstract

The content discusses the use of asyncpg, an asynchronous library for working with PostgreSQL databases, in conjunction with Supabase as a backend service. It outlines the steps to set up a Postgres database using Supabase, connect to it using asyncpg, and perform operations such as creating tables, inserting data, and fetching results. The guide highlights the importance of using non-blocking sockets and connection pooling to handle multiple concurrent database queries efficiently. It also covers error handling and transactions within the asynchronous context, providing code examples and illustrations to demonstrate the concepts.

Opinions

  • The author advocates for the use of asyncpg over blocking libraries when working with coroutines in Python, suggesting that it is better suited for asynchronous programming.
  • Supabase is recommended as a convenient and efficient service for setting up a Postgres database with minimal effort.
  • The author emphasizes the benefits of using a connection pool to manage multiple database connections, which is crucial for executing concurrent queries without running into socket errors.
  • The article suggests that handling transactions within an asynchronous context is important for maintaining data integrity and managing potential errors during database operations.
  • The author encourages reader engagement and community building by inviting feedback and suggesting readers follow the writer and Plain English channels for more content.

Asyncpg: Asynchronous Alternative to Psycopg

Blocking libraries won’t work seamlessly with coroutines. To run queries concurrently against a database, we’ll need to use an asyncio-friendly library that uses non-blocking sockets. To do this, we’ll use a library called asyncpg, which will let us asynchronously connect to Postgres databases and run queries against them.

Supabase

We will use Supabase. You can set up your db with almost no effort. It can do much more, but we need just a db for our purposes.

Below are three steps we need to get started with the DB.

Creating a database

Copy the password, we will need it later.

Supabase database setup

Getting the credentials

From this page, you can get the credentials for the DB connection.

Db credentials

That’s it. Good job! In a few steps, we have the database ready.

Install dependencies

pip install asyncpg

Connecting to a Postgres database

We’ve connected! But nothing is currently stored in our database. We used credentials from Supabase to our Postgres instance.

import asyncpg
import asyncio


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    version = connection.get_server_version()
    print(f"Connected! Postgres version is {version}")
    await connection.close()


asyncio.run(main())

# Connected! Postgres version is ServerVersion(major=15, minor=0, micro=1, releaselevel='final', serial=0)

Defining a database schema

Using asyncpg, we’ll execute the statement to create our users table.

import asyncpg
import asyncio

CREATE_USERS_TABLE = (
    """
    CREATE TABLE IF NOT EXISTS users (
        user_id SERIAL PRIMARY KEY,
        first_name TEXT NOT NULL,
        last_name TEXT NOT NULL
    );"""
)


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    status = await connection.execute(CREATE_USERS_TABLE)
    print(status)  # CREATE TABLE

    await connection.close()


asyncio.run(main())

Result

You can check in Supabase that the table was created.

Table in Supabase

Inserting data into the database

Note that execute is a coroutine, so to run our SQL we need to await the call.

import asyncpg
import asyncio


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    await connection.execute("INSERT INTO users VALUES(DEFAULT, 'Tomas', 'Svojanovsky')")
    await connection.execute("INSERT INTO users VALUES(DEFAULT, 'John', 'Doe')")

    await connection.close()


asyncio.run(main())

Result

Hooray! We have our data in the DB.

Insert data in the DB

Fetching data from the database

If we wanted to fetch a single result, we could call connection.fetchrow(), which will return a single record from the query. The default asyncpg connection will pull all results from our query into memory, so for the time being there is no performance difference between fetchrow and fetch.

import asyncpg
import asyncio
from asyncpg import Record


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    result: list[Record] = await connection.fetch("SELECT user_id, first_name, last_name FROM users")

    for row in result:
        print(*row)

    await connection.close()


asyncio.run(main())

# 1 Tomas Svojanovsky
# 2 John Doe

Multiple queries

Oops. We get an error.

We have only one connection. One connection means one socket connection to our database. Since we have only one connection and we’re trying to read the results of multiple queries concurrently, we experience an error.

We can resolve this by creating multiple connections to our database and executing one query per connection. Since creating connections is resource-expensive, we will cache them and we can access them when needed makes sense.

We will need a connection pool to resolve this issue.

import asyncpg
import asyncio
from asyncpg import Record


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    query = "SELECT user_id, first_name, last_name FROM users"

    queries = [
        connection.execute(query),
        connection.execute(query),
    ]

    result: tuple[Record] = await asyncio.gather(*queries)

    for row in result:
        print(*row)

    await connection.close()


asyncio.run(main())

# asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
# Task was destroyed but it is pending!

Connection pool

We first acquire a connection from the pool with pool.acquire(). This coroutine will then suspend running until a connection is available from the connection pool. We do this in an async with block; this will ensure that once we leave the block, the connection will be returned to the pool.

import asyncpg
import asyncio
from asyncpg import Record


async def fetch_user(pool, query, id):
    async with pool.acquire() as connection:
        return await connection.fetchrow(query, id)


async def main():
    async with asyncpg.create_pool(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                   port=5432,
                                   user="postgres",
                                   database="postgres",
                                   password="xW3OEGMQot6xvB7l",
                                   max_size=10,
                                   min_size=10,
                                   ) as pool:

        query = "SELECT user_id, first_name, last_name FROM users WHERE user_id = $1"

        queries = [
            fetch_user(pool, query, 1),
            fetch_user(pool, query, 2),
        ]

        result: tuple[Record] = await asyncio.gather(*queries)

        for row in result:
            print(*row)


asyncio.run(main())

# 1 Tomas Svojanovsky
# 2 John Doe

Transactions

Assuming our transaction is committed successfully, we should see the data. This example assumes zero errors running the statement, and everything was committed successfully.

import asyncpg
import asyncio
from asyncpg import Record


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    async with connection.transaction():
        result: list[Record] = await connection.fetch("SELECT user_id, first_name, last_name FROM users")

        for row in result:
            print(*row)

    await connection.close()

asyncio.run(main())

Transaction with rollback

Because we attempted to insert a duplicate key and then see that the result of our select statement was empty, indicating that we successfully rolled back the transaction.

import asyncpg
import asyncio


async def main():
    connection = await asyncpg.connect(host="db.fygynsqqzskzevaplkjz.supabase.co",
                                       port=5432,
                                       user="postgres",
                                       database="postgres",
                                       password="xW3OEGMQot6xvB7l")

    try:
        async with connection.transaction():
            await connection.execute("INSERT INTO users VALUES(1, 'tomas', 'svojanovsky')")
    except Exception as e:
        print(e)
    finally:
        await connection.close()


asyncio.run(main())

# duplicate key value violates unique constraint "users_pkey"
# DETAIL:  Key (user_id)=(1) already exists.

Thanks for reading my article!

If you enjoyed the read and want to be part of our growing community, hit the follow button, and let’s embark on a knowledge journey together.

Your feedback and comments are always welcome, so don’t hold back!

In Plain English

Thank you for being a part of our community! Before you go:

Software Development
Python
Asyncpg
Asyncio
Recommended from ReadMedium