avatarAhmed Nafies

Summary

The provided content is a comprehensive tutorial on integrating FastAPI with SQLAlchemy's Async ORM and Alembic for database migrations, demonstrating how to build an asynchronous CRUD application with a PostgreSQL database.

Abstract

The tutorial outlines the steps to set up a FastAPI application with asynchronous support using SQLAlchemy's Async ORM, which now includes async functionality. It begins by detailing the necessary dependencies, including FastAPI, SQLAlchemy 1.4+, Alembic, asyncpg, and uvicorn, and guides the reader through creating a virtual environment and installing these packages. The use of Docker for running a PostgreSQL database is explained, followed by instructions on testing the database connection. The tutorial then covers the application's directory structure, configuration, and database session setup. It provides a sample User model and demonstrates how to perform CRUD operations asynchronously. The Alembic tool is introduced for database migrations, with steps on initializing and running migrations. The article concludes with the implementation of CRUD views in FastAPI, the entry point configuration for the application, and instructions on testing the API endpoints using the built-in FastAPI documentation and httpie. The full code for the tutorial is available on GitHub.

Opinions

  • The author recommends checking out a related tutorial on SQLAlchemy Async ORM before proceeding.
  • The author suggests using Docker and Pipenv for the development environment but notes that one can use their preferred Python package manager.
  • The author emphasizes the power of Docker in managing the PostgreSQL database instance.
  • The author highlights the importance of using Alembic for database schema migrations, particularly when working with async ORM.
  • The author encourages the use of the OpenAPI docs generated by FastAPI for API testing but also recommends httpie as an alternative testing tool.
  • The author provides a hint about a new article related to SQLAlchemy 2.0, implying that readers should keep an eye on further developments and improvements in the ecosystem.

[Tutorial] FastAPI with SQLAlchemy Async ORM and Alembic

If you go through the FastAPI docs, you will see that the recommended way to use SQLAlchemy asynchronously is by using the package databases and SQLAlchemy core since SQLAlchemy ORM did not have support for async operations. I have already created a tutorial describing how to use FastAPI with databases with SQLAlchemy core. But now SQLAlchemy ORM with support for async is finally here. The full code can be found here on Github.

I would recommend checking out this tutorial first.

First, let's start with our development environment, I will be using Docker and Pipenv but feel free to use your favorite Python package manager. I’ll be using Python 3.9.

Dependencies

  1. fastapi
  2. sqlalchemy ≥ 1.4
  3. Alembic
  4. asyncpg
  5. uvicorn

let’s create our virtual-env and install our initial dependencies.

pipenv install fastapi sqlachemy alembic asyncpg uvicorn

Postgres

we will utilize the power of Docker to pull a Postgres db.

docker run \
  --rm   \
  --name  postgres \
  -p 5432:5432 \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=postgres \
  -d postgres

Let’s test our database.

docker exec -it postgres psql -U postgres

To exit from the Postgres shell.

#\q

Application

Here is our directory structure

dir
|__ config.py
|__ database.py
|__ models.py
|__ views.py
|__ main.py

Application Config

Create config.py file and add the database configuration.

import os
class Config:
    DB_USER = os.getenv("DB_USER", "postgres")
    DB_PASSWORD = os.getenv("DB_PASSWORD", "postgres")
    DB_NAME = os.getenv("DB_NAME", "postgres")
    DB_HOST = os.getenv("DB_HOST", "localhost")
    DB_CONFIG = f"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"

Database Session Config

Create database.py and add the database session config.

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from config import Config
Base = declarative_base()
class AsyncDatabaseSession:
    def __init__(self):
        self._session = None
        self._engine = None
def __getattr__(self, name):
        return getattr(self._session, name)
def init(self):
        self._engine = create_async_engine(
            Config.DB_CONFIG,
            future=True,
            echo=True,
        )
        self._session = sessionmaker(
            self._engine, expire_on_commit=False, class_=AsyncSession
        )()
async def create_all(self):
        async with self._engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
db=AsyncDatabaseSession()

Database Models

Create models.py file and let’s add a simple user model.

from sqlalchemy import Column, String, DateTime
from sqlalchemy import update as sqlalchemy_update
from sqlalchemy.future import select
from datetime import datetime
from database import Base, db
from uuid import uuid4
class User(Base):
    __tablename__ = "users"
    id = Column(String, primary_key=True)
    full_name = Column(String)
    created_at = Column(DateTime, index=True, default=datetime.utcnow)
    def __repr__(self):
        return (
            f"<{self.__class__.__name__}("
            f"id={self.id}, "
            f"full_name={self.full_name}, "
            f")>"
        )
    @classmethod
    async def create(cls, **kwargs):
        user = cls(id=str(uuid4()), **kwargs)
        db.add(user)
        try:
            await db.commit()
        except Exception:
            await db.rollback()
            raise
        return user
    @classmethod
    async def update(cls, id, **kwargs):
        query = (
            sqlalchemy_update(cls)
            .where(cls.id == id)
            .values(**kwargs)
            .execution_options(synchronize_session="fetch")
        )
            await db.execute(query)
        try:
            await db.commit()
        except Exception:
            await db.rollback()
            raise
    @classmethod
    async def get(cls, id):
        query = select(cls).where(cls.id == id)
        users = await db.execute(query)
        (user,) = users.first()
        return user
    @classmethod
    async def get_all(cls):
        query = select(cls)
        users = await db.execute(query)
        users = users.scalars().all()
        return users
    @classmethod
    async def delete(cls, id):
        query = sqlalchemy_delete(cls).where(cls.id == id)
        await db.execute(query)
        try:
            await db.commit()
        except Exception:
            await db.rollback()
            raise
        return True

Alembic

Initialize alembic

alembic init -t async migrations

Now you will see a file and a subdirectory added.

dir
|__ config.py
|__ database.py
|__ models.py
|__ views.py
|__ main.py
|__ alembic.ini
|__ migrations
   |__ versions
   |__ env.py

Now we will need to edit migrations/env.pyto get the database URL from config.py

import asyncio
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
import models
target_metadata = models.Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
from config import Config
config.set_main_option("sqlalchemy.url", Config.DB_CONFIG)
def run_migrations_offline():
    """Run migrations in 'offline' mode.
...

Now we could create and run our first database migration.

alembic revision --autogenerate -m 'adds user table'
alembic upgrade head

let’s create our first user, open python shell.

>>> from database import db
>>> db.init()
>>> from models import User
>>> import asyncio
>>> asyncio.run(User.create(full_name="John Doe"))

Let’s check our database.

Now we can see our new users and alembic_versiontables.

We can as well get the record we just created.

Views

Create views.py file and add the CRUD views.

from fastapi import APIRouter
from pydantic import BaseModel
from models import User
class UserSchema(BaseModel):
    full_name: str
class UserSerializer(BaseModel):
    id: str
    full_name: str
class Config:
        orm_mode = True
api = APIRouter(
    prefix="/users",
)
@api.post("/", response_model=UserSerializer)
async def create_user(user: UserSchema):
    user = await User.create(**user.dict())
    return user
@api.get("/{id}", response_model=UserSerializer)
async def get_user(id: str):
    user = await User.get(id)
    return user
@api.get("/", response_model=List[UserSerializer])
async def get_all_users():
    users = await User.get_all()
    return users
@api.put("/{id}", response_model=UserSerializer)
async def update(id: str, user: UserSchema):
    user = await User.update(id, **user.dict())
    return user
@api.delete("/{id}", response_model=bool)
async def delete_user(id: str):
    return await User.delete(id)

Entry point

Create main.py file and add the following.

from database import db
from fastapi import FastAPI
def init_app():
    db.init()
    app = FastAPI(
        title="Users App",
        description="Handling Our Users",
        version="1",
    )
    @app.on_event("startup")
    async def startup():
        await db.create_all()
    @app.on_event("shutdown")
    async def shutdown():
        await db.close()
    from views import api
    app.include_router(
        api,
        prefix="/api/v1",
    )
    return app
app = init_app()

Testing

Now it is time to run and test our applications.

uvicorn main:app --reload

checking /docs

Let’s test the API endpoints, I know the OpenAPI docs are amazing, but I would recommend using httpie for a change.

  1. Create user

2. The wrong name, update to Will Smith

3. Let’s get the user we just updated.

4. Time to remove it

To stop the running Postgres db

docker stop postgres

This is a simple implementation for FastAPI with the SQLAlchemy Async ORM. The full code can be found here on Github.

Hint: A new article has been released with SQLAlchemy 2.0, check it out here

Python
Async
Alembic
Fastapi
Sqlalchemy
Recommended from ReadMedium