跳至内容

SQL(关系型)数据库

信息

这些文档即将更新。🎉

当前版本假设使用 Pydantic v1 和小于 2.0 的 SQLAlchemy 版本。

新文档将包括 Pydantic v2,并将使用 SQLModel(它也是基于 SQLAlchemy 的),一旦它更新为也使用 Pydantic v2。

FastAPI 不需要您使用 SQL(关系型)数据库。

但您可以使用任何您想要的数据库。

在这里,我们将看到一个使用 SQLAlchemy 的示例。

您可以轻松地将其调整为 SQLAlchemy 支持的任何数据库,例如

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server 等。

在这个示例中,我们将使用 SQLite,因为它使用单个文件,并且 Python 有内置支持。因此,您可以复制此示例并按原样运行它。

稍后,对于您的生产应用程序,您可能想要使用像 PostgreSQL 这样的数据库服务器。

提示

有一个使用 FastAPIPostgreSQL 的官方项目生成器,它们都基于 Docker,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-postgresql

注意

请注意,大多数代码都是您将在任何框架中使用的标准 SQLAlchemy 代码。

FastAPI 特定的代码一如既往地少。

ORM

FastAPI 可与任何数据库和任何与数据库通信的库样式配合使用。

一个常见的模式是使用“ORM”:“对象关系映射”库。

ORM 拥有将代码中的对象与数据库表 (“关系”) 之间进行转换 (“映射”) 的工具。

使用 ORM,您通常会创建一个类来表示 SQL 数据库中的表,该类的每个属性都表示一个列,包括名称和类型。

例如,类 Pet 可以表示 SQL 表 pets

并且该类的每个实例对象都表示数据库中的一行。

例如,一个名为 orion_cat 的对象(一个 Pet 类的实例)可以有一个名为 orion_cat.type 的属性,对应 type 列。这个属性的值可以是 "cat"

这些 ORM 还提供了工具来建立表或实体之间的连接或关系。

这样,你还可以有一个名为 orion_cat.owner 的属性,其中包含这个宠物主人的数据,这些数据来自 owners 表。

因此,orion_cat.owner.name 可以是这个宠物主人的姓名(来自 owners 表中的 name 列)。

它可以具有 "Arquilian" 这样的值。

当你尝试从你的宠物对象访问它时,ORM 会完成所有工作,从相应的 owners 表中获取信息。

常见的 ORM 包括:Django-ORM(Django 框架的一部分)、SQLAlchemy ORM(SQLAlchemy 的一部分,独立于框架)和 Peewee(独立于框架),以及其他。

这里我们将看到如何使用 SQLAlchemy ORM

你也可以以类似的方式使用任何其他 ORM。

提示

在 docs 中有一个使用 Peewee 的等效文章。

文件结构

对于这些示例,假设你有一个名为 my_super_project 的目录,其中包含一个名为 sql_app 的子目录,其结构如下

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

文件 __init__.py 只是一个空文件,但它告诉 Python,sql_app 及其所有模块(Python 文件)是一个包。

现在让我们看看每个文件/模块的作用。

安装 SQLAlchemy

首先你需要安装 SQLAlchemy

确保你创建了一个 虚拟环境,激活它,然后安装它,例如

$ pip install sqlalchemy

---> 100%

创建 SQLAlchemy 部分

让我们参考文件 sql_app/database.py

导入 SQLAlchemy 部分

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

为 SQLAlchemy 创建一个数据库 URL

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

在这个示例中,我们“连接”到一个 SQLite 数据库(打开一个包含 SQLite 数据库的文件)。

该文件将位于文件 sql_app.db 的同一目录中。

这就是最后一部分是 ./sql_app.db 的原因。

如果你使用的是 PostgreSQL 数据库,你只需取消注释这行

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

...并使用你的数据库数据和凭据进行调整(对 MySQL、MariaDB 或其他数据库也是如此)。

提示

这是你需要修改的主要行,如果你想使用其他数据库。

创建 SQLAlchemy engine

第一步是创建一个 SQLAlchemy "engine"。

我们稍后将在其他地方使用这个 engine

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

注意

参数

connect_args={"check_same_thread": False}

...仅适用于 SQLite。它不适用于其他数据库。

"技术细节"

默认情况下,SQLite 仅允许一个线程与其通信,假设每个线程都会处理一个独立的请求。

这是为了防止意外地为不同的内容(针对不同的请求)共享相同的连接。

但在 FastAPI 中,使用普通函数 (def),多个线程可能会为同一个请求与数据库交互,因此我们需要让 SQLite 知道它应该使用 connect_args={"check_same_thread": False} 来允许这样做。

此外,我们将确保每个请求在依赖项中获得自己的数据库连接会话,因此不需要该默认机制。

创建 SessionLocal

SessionLocal 类的每个实例都是一个数据库会话。该类本身还不是一个数据库会话。

但一旦我们创建了 SessionLocal 类的实例,这个实例将成为实际的数据库会话。

我们将其命名为 SessionLocal,以将其与我们从 SQLAlchemy 导入的 Session 区分开来。

我们稍后将使用 Session(从 SQLAlchemy 导入的)。

要创建 SessionLocal 类,请使用函数 sessionmaker

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建一个 Base

现在我们将使用函数 declarative_base(),它返回一个类。

稍后我们将从这个类继承,以创建每个数据库模型或类(ORM 模型)。

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建数据库模型

现在让我们看看文件 sql_app/models.py

Base 类创建 SQLAlchemy 模型

我们将使用之前创建的 Base 类来创建 SQLAlchemy 模型。

提示

SQLAlchemy 使用术语“模型”来指代这些与数据库交互的类和实例。

但 Pydantic 也使用术语“模型”来指代不同的东西,即数据验证、转换和文档类和实例。

database 导入 Base(来自上面的 database.py 文件)。

创建从它继承的类。

这些类是 SQLAlchemy 模型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

__tablename__ 属性告诉 SQLAlchemy 为这些模型中的每一个在数据库中使用的表名。

创建模型属性/列

现在创建所有模型(类)属性。

这些属性中的每一个都代表其对应数据库表中的一个列。

我们使用 SQLAlchemy 的 Column 作为默认值。

我们传入一个 SQLAlchemy 类“类型”,例如 IntegerStringBoolean,作为参数,它定义了数据库中的类型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

创建关系

现在创建关系。

为此,我们使用 SQLAlchemy ORM 提供的 relationship

这将或多或少成为一个“魔术”属性,它将包含来自与该表相关的其他表的值。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

当访问 User 中的 items 属性时,如 my_user.items,它将包含一个 Item SQLAlchemy 模型列表(来自 items 表),这些模型具有指向 users 表中此记录的外键。

当你访问 my_user.items 时,SQLAlchemy 实际上会进入 items 表并获取这些项目,并将它们填充到这里。

当访问 Item 中的 owner 属性时,它将包含一个 User SQLAlchemy 模型,来自 users 表。它将使用具有外键的 owner_id 属性/列来了解从 users 表中获取哪个记录。

创建 Pydantic 模型

现在让我们检查文件 sql_app/schemas.py

提示

为了避免 SQLAlchemy 模型和 Pydantic 模型之间的混淆,我们将使用 models.py 文件包含 SQLAlchemy 模型,schemas.py 文件包含 Pydantic 模型。

这些 Pydantic 模型或多或少定义了一个“模式”(有效的数据形状)。

因此这将有助于我们在使用两者时避免混淆。

创建初始 Pydantic 模型 / 模式

创建 ItemBaseUserBase Pydantic 模型(或称之为“模式”),以便在创建或读取数据时具有共同属性。

并创建一个 ItemCreateUserCreate,从它们继承(因此它们将具有相同的属性),以及创建所需的任何其他数据(属性)。

因此,用户在创建时还将拥有一个 password

但出于安全考虑,password 将不会出现在其他 Pydantic 模型中,例如,它不会在读取用户时从 API 发送。

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

SQLAlchemy 风格和 Pydantic 风格

注意 SQLAlchemy 模型使用 = 定义属性,并将类型作为参数传递给 Column,如

name = Column(String)

而 Pydantic 模型使用 : 声明类型,即新的类型注释语法/类型提示

name: str

请牢记这些,以便在使用它们时不会将 =: 混淆。

创建 Pydantic 模型 / 模式用于读取/返回

现在创建 Pydantic 模型(模式),这些模型将在读取数据时使用,即从 API 返回数据时。

例如,在创建项目之前,我们不知道将为它分配什么 ID,但当读取它(从 API 返回它)时,我们已经知道它的 ID。

同样,当读取用户时,我们现在可以声明 items 将包含属于此用户的项目。

不仅包含这些项目的 ID,还包含我们在 Pydantic 模型中定义的所有用于读取项目的数据:Item

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

提示

注意,User(用于读取用户(从 API 返回它)的 Pydantic 模型)不包含 password

使用 Pydantic 的 orm_mode

现在,在用于读取的 Pydantic 模型中,ItemUser,添加一个内部 Config 类。

这个 Config 类用于向 Pydantic 提供配置。

Config 类中,设置属性 orm_mode = True

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

提示

注意它使用 = 赋值,就像

orm_mode = True

它不使用 : 作为之前类型声明的符号。

这是设置一个配置值,而不是声明一个类型。

Pydantic 的 orm_mode 将告诉 Pydantic 模型即使数据不是 dict,而是 ORM 模型(或任何其他具有属性的任意对象),也要读取数据。

这样,它不仅会尝试从 dict 中获取 id 值,如

id = data["id"]

它还会尝试从属性中获取它,如

id = data.id

有了它,Pydantic 模型与 ORM 兼容,你只需在你的路径操作中的 response_model 参数中声明它。

你可以返回一个数据库模型,它将从该模型中读取数据。

关于 ORM 模式技术细节

SQLAlchemy 和许多其他 ORM 默认情况下是“延迟加载”。

这意味着,例如,它们不会从数据库中获取关系的数据,除非你尝试访问包含该数据的属性。

例如,访问属性 items

current_user.items

将使 SQLAlchemy 进入 items 表并获取该用户的项目,但在访问之前不会。

如果没有 orm_mode,如果你从你的路径操作中返回一个 SQLAlchemy 模型,它将不包含关系数据。

即使你在你的 Pydantic 模型中声明了这些关系。

但使用 ORM 模式时,由于 Pydantic 本身会尝试从属性中访问它所需的数据(而不是假设为 dict),因此您可以声明要返回的特定数据,它能够获取它,即使是从 ORM 中。

CRUD 工具

现在让我们看看文件 sql_app/crud.py

在这个文件中,我们将拥有可重复使用的函数来与数据库中的数据进行交互。

CRUD 来自:**C**reate(创建)、**R**ead(读取)、**U**pdate(更新)和 **D**elete(删除)。

...虽然在这个例子中我们只创建和读取。

读取数据

sqlalchemy.orm 中导入 Session,这将允许您声明 db 参数的类型,并在您的函数中进行更好的类型检查和完成。

导入 models(SQLAlchemy 模型)和 schemas(Pydantic 模型 / 架构)。

创建实用程序函数来

  • 按 ID 和电子邮件读取单个用户。
  • 读取多个用户。
  • 读取多个项目。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

提示

通过创建仅专注于与数据库交互(获取用户或项目)的独立于您的路径操作函数的函数,您可以更容易地将它们重复使用在多个部分,并为它们添加 单元测试

创建数据

现在创建实用程序函数来创建数据。

步骤如下

  • 使用您的数据创建 SQLAlchemy 模型实例
  • 将该实例对象add 到您的数据库会话中。
  • commit 对数据库的更改(以便它们被保存)。
  • refresh 您的实例(以便它包含来自数据库的任何新数据,例如生成的 ID)。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

信息

在 Pydantic v1 中,该方法被称为 .dict(),它在 Pydantic v2 中已弃用(但仍受支持),并重命名为 .model_dump()

这里的示例使用 .dict() 与 Pydantic v1 兼容,但是如果您能使用 Pydantic v2,则应该使用 .model_dump() 代替。

提示

User 的 SQLAlchemy 模型包含一个 hashed_password,它应该包含密码的安全散列版本。

但是,由于 API 客户端提供的是原始密码,您需要提取它并在应用程序中生成散列密码。

然后将 hashed_password 参数连同值一起传递以进行保存。

警告

这个例子不安全,密码没有散列。

在实际应用中,您需要对密码进行散列,并且永远不要以明文形式保存它们。

有关更多详细信息,请返回教程中的安全部分。

这里我们只关注数据库的工具和机制。

提示

我们没有将每个关键字参数传递给 Item 并在 Pydantic 模型中读取它们中的每一个,而是使用

item.dict()

生成了一个包含 Pydantic 模型数据的 dict,然后我们将 dict 的键值对作为关键字参数传递给 SQLAlchemy Item,使用

Item(**item.dict())

然后,我们传递额外的关键字参数 owner_id,该参数未由 Pydantic 模型提供,使用

Item(**item.dict(), owner_id=user_id)

主要的 FastAPI 应用程序

现在在文件 sql_app/main.py 中,让我们整合和使用之前创建的所有其他部分。

创建数据库表

以非常简单的方式创建数据库表

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Alembic 注意

通常,您可能会使用 Alembic 初始化您的数据库(创建表等)。

您还将使用 Alembic 进行“迁移”(这是它的主要工作)。

“迁移”是在您更改 SQLAlchemy 模型的结构、添加新属性等时所需的步骤集,以便在数据库中复制这些更改,添加新列、新表等。

您可以在 完整堆栈 FastAPI 模板 中的 FastAPI 项目中找到 Alembic 的示例。具体来说,在 源代码中的 alembic 目录 中。

创建依赖项

现在使用我们在 sql_app/database.py 文件中创建的 SessionLocal 类来创建依赖项。

我们需要为每个请求拥有一个独立的数据库会话/连接(SessionLocal),在整个请求中使用同一个会话,然后在请求完成之后关闭它。

然后,将为下一个请求创建一个新的会话。

为此,我们将使用 yield 创建一个新的依赖项,如前面关于 使用 yield 的依赖项 部分中所述。

我们的依赖项将创建一个新的 SQLAlchemy SessionLocal,它将在单个请求中使用,然后在请求完成之后关闭它。

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

信息

我们将 SessionLocal() 的创建和请求的处理放在一个 try 块中。

然后我们在 finally 块中关闭它。

这样,我们确保数据库会话在请求之后始终关闭。即使在处理请求时出现异常。

但是您不能从退出代码(在 yield 之后)引发另一个异常。有关更多信息,请参阅 使用 yieldHTTPException 的依赖项

然后,在路径操作函数中使用依赖项时,我们使用我们直接从 SQLAlchemy 导入的类型 Session 声明它。

这将为我们在路径操作函数内部提供更好的编辑器支持,因为编辑器将知道 db 参数的类型是 Session

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

"技术细节"

参数 db 实际上是 SessionLocal 类型,但是这个类(使用 sessionmaker() 创建)是 SQLAlchemy Session 的“代理”,因此,编辑器实际上并不知道提供了哪些方法。

但是通过将类型声明为 Session,编辑器现在可以知道可用的方法(.add().query().commit() 等),并可以提供更好的支持(例如完成)。类型声明不会影响实际对象。

创建您的 FastAPI 路径操作

现在,最后,这是标准 FastAPI 路径操作代码。

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

我们使用 yield 在每个请求之前创建数据库会话,然后在之后关闭它。

然后,我们可以在路径操作函数中创建所需的依赖项,以直接获取该会话。

有了它,我们就可以直接从路径操作函数内部调用 crud.get_user 并使用该会话。

提示

请注意,您返回的值是 SQLAlchemy 模型,或 SQLAlchemy 模型的列表。

但是,由于所有路径操作都使用 orm_mode 在 Pydantic 模型 / 架构中具有 response_model,因此将从您的 Pydantic 模型中提取声明的数据并返回给客户端,并进行所有正常的过滤和验证。

提示

另请注意,有一些 response_models 具有标准的 Python 类型,例如 List[schemas.Item]

但是,由于该 List 的内容/参数是使用 orm_mode 的 Pydantic 模型,因此数据将像往常一样检索并返回给客户端,不会出现问题。

关于 defasync def

这里我们在路径操作函数和依赖项中使用 SQLAlchemy 代码,反过来,它将与外部数据库进行通信。

这可能会需要一些“等待”。

但是,由于 SQLAlchemy 不兼容直接使用 await,就像使用

user = await db.query(User).first()

...相反,我们使用

user = db.query(User).first()

那么我们应该在路径操作函数和依赖项中声明路径操作函数,而不要使用 async def,只需使用普通的 def,例如

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    ...

信息

如果您需要异步连接到关系数据库,请参阅 异步 SQL(关系)数据库

“非常技术性的细节”

如果您好奇并且具备深厚技术知识,您可以查看 异步 文档中关于 async defdef 的处理方式的非常技术性的细节。

迁移

因为我们直接使用 SQLAlchemy,并且不需要任何插件来让它与 FastAPI 一起工作,所以我们可以直接使用 Alembic 集成数据库 迁移

而且,由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码位于单独的独立文件中,您甚至可以在不安装 FastAPI、Pydantic 或其他任何东西的情况下使用 Alembic 执行迁移。

同样地,您也可以在与 FastAPI 无关的代码的其他部分中使用相同的 SQLAlchemy 模型和实用程序。

例如,在使用 CeleryRQARQ 的后台任务工作程序中。

查看所有文件

请记住,您应该有一个名为 my_super_project 的目录,该目录包含一个名为 sql_app 的子目录。

sql_app 应该包含以下文件

  • sql_app/__init__.py:是一个空文件。

  • sql_app/database.py:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • sql_app/models.py:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")
  • sql_app/schemas.py:
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
  • sql_app/crud.py:
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
  • sql_app/main.py:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

检查它

您可以复制此代码并按原样使用它。

信息

事实上,这里显示的代码是测试的一部分。就像这些文档中的大部分代码一样。

然后,您可以使用 Uvicorn 运行它

$ uvicorn sql_app.main:app --reload

<span style="color: green;">INFO</span>:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

然后,您可以在浏览器中打开 http://127.0.0.1:8000/docs

您将能够与您的 FastAPI 应用程序进行交互,从真实的数据库中读取数据

直接与数据库交互

如果您想独立于 FastAPI 直接探索 SQLite 数据库(文件),以调试其内容、添加表、列、记录、修改数据等,可以使用 DB Browser for SQLite

它看起来像这样

您也可以使用在线 SQLite 浏览器,例如 SQLite ViewerExtendsClass

使用中间件的替代 DB 会话

如果您不能使用带有 yield 的依赖项——例如,如果您没有使用 Python 3.7 并且不能为 Python 3.6 安装上面提到的“backports”——您可以以类似的方式在“中间件”中设置会话。

“中间件”基本上是一个始终为每个请求执行的函数,在该函数中,一些代码在端点函数之前执行,一些代码在端点函数之后执行。

创建中间件

我们将添加的中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemy SessionLocal,将其添加到请求中,然后在请求完成后关闭它。

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

信息

我们将 SessionLocal() 的创建和请求的处理放在一个 try 块中。

然后我们在 finally 块中关闭它。

这样,我们确保数据库会话在请求之后始终关闭。即使在处理请求时出现异常。

关于 request.state

request.state 是每个 Request 对象的一个属性。它用于存储附加到请求本身的任意对象,例如本例中的数据库会话。您可以在 Starlette 关于 Request 状态的文档 中了解更多信息。

在本例中,它帮助我们确保在整个请求过程中使用单个数据库会话,然后在之后(在中间件中)关闭它。

使用 yield 的依赖项或中间件

在此处添加一个中间件类似于使用 yield 的依赖项,但有一些区别

  • 它需要更多代码,并且稍微复杂一些。
  • 中间件必须是一个 async 函数。
    • 如果其中有必须“等待”网络的代码,它可能会在那里“阻塞”您的应用程序并稍微降低性能。
    • 虽然对于 SQLAlchemy 的工作方式来说,这可能不是什么大问题。
    • 但是,如果您在中间件中添加了更多代码,其中包含大量 I/O 等待,那么它就会有问题。
  • 中间件会为每个请求运行。
    • 因此,将为每个请求创建一个连接。
    • 即使处理该请求的路径操作不需要数据库。

提示

当依赖项与 yield 足够满足用例时,最好使用它们。

信息

使用 yield 的依赖项最近被添加到 FastAPI 中。

本教程的早期版本只包含使用中间件的示例,并且可能存在许多应用程序使用中间件来管理数据库会话。