使用 Peewee 的 SQL(关系型)数据库 (已弃用)¶
"已弃用"
本教程已弃用,将在未来版本中删除。
警告
如果您刚开始学习,使用 SQLAlchemy 的教程 SQL(关系型)数据库 就足够了。
您可以跳过此部分。
不建议在 FastAPI 中使用 Peewee,因为它与任何异步 Python 代码都不兼容。有几个更好的替代方案。
信息
这些文档假设使用 Pydantic v1。
由于 Pewee 与任何异步代码都不兼容,并且有更好的替代方案,因此我不会为 Pydantic v2 更新这些文档,目前仅出于历史目的保留这些文档。
此处的示例不再在 CI 中进行测试(以前是测试的)。
如果您从头开始一个项目,您可能最好使用 SQLAlchemy ORM (SQL(关系型)数据库),或任何其他异步 ORM。
如果您已经有使用 Peewee ORM 的代码库,您可以在此处查看如何在 FastAPI 中使用它。
"需要 Python 3.7+"
您需要 Python 3.7 或更高版本才能安全地将 Peewee 与 FastAPI 一起使用。
用于异步的 Peewee¶
Peewee 不是为异步框架设计的,也没有考虑到异步框架。
Peewee 对其默认值以及如何使用它有一些严格的假设。
如果您正在使用旧的非异步框架开发应用程序,并且可以使用其所有默认值,它可以是一个很棒的工具。
但是,如果您需要更改某些默认值,支持多个预定义数据库,使用异步框架(如 FastAPI)等,则需要添加一些复杂的额外代码来覆盖这些默认值。
尽管如此,这是可以做到的,在这里您将看到需要添加哪些代码才能在 FastAPI 中使用 Peewee。
相同的应用程序¶
我们将创建与 SQLAlchemy 教程中相同的应用程序 (SQL(关系型)数据库)。
实际上大部分代码都是一样的。
因此,我们将只关注差异。
文件结构¶
假设您有一个名为 my_super_project
的目录,其中包含一个名为 sql_app
的子目录,其结构如下
.
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
└── schemas.py
这与我们在 SQLAlchemy 教程中使用的结构几乎相同。
现在让我们看看每个文件/模块的作用。
创建 Peewee 部分¶
让我们参考文件 sql_app/database.py
。
标准的 Peewee 代码¶
让我们首先检查所有正常的 Peewee 代码,创建一个 Peewee 数据库
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
提示
请记住,如果您想使用不同的数据库,例如 PostgreSQL,您不能只更改字符串。您需要使用不同的 Peewee 数据库类。
注意¶
参数
check_same_thread=False
等同于 SQLAlchemy 教程中的参数
connect_args={"check_same_thread": False}
…它仅在 SQLite
中需要。
"技术细节"
与SQL(关系型)数据库中完全相同的技术细节适用。
使 Peewee 兼容异步 PeeweeConnectionState
¶
Peewee 和 FastAPI 的主要问题是 Peewee 严重依赖于Python 的 threading.local
,并且它没有直接的方法来覆盖它或让您直接处理连接/会话(如 SQLAlchemy 教程中所做的那样)。
而 threading.local
与现代 Python 的新异步特性不兼容。
"技术细节"
threading.local
用于拥有一个“魔法”变量,该变量对每个线程都有不同的值。
这在旧的框架中很有用,这些框架设计为每个请求只有一个线程,不多不少。
使用此方法,每个请求都将拥有自己的数据库连接/会话,这才是最终的目标。
但是 FastAPI 使用新的异步特性,可以处理同一个线程上的多个请求。同时,对于单个请求,它可以在不同的线程(在线程池中)运行多个内容,具体取决于您使用的是 async def
还是普通的 def
。这就是 FastAPI 提供所有性能改进的原因。
但是 Python 3.7 及更高版本提供了比 threading.local
更高级的替代方案,它也可以在使用 threading.local
的地方使用,但与新的异步特性兼容。
我们将使用它。它被称为contextvars
。
我们将覆盖使用 threading.local
的 Peewee 内部部分,并用 contextvars
替换它们,并进行相应的更新。
这可能看起来有点复杂(实际上确实如此),您不需要完全理解它的工作原理就可以使用它。
我们将创建一个 PeeweeConnectionState
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
此类继承自 Peewee 使用的一个特殊的内部类。
它具有使 Peewee 使用 contextvars
而不是 threading.local
的所有逻辑。
contextvars
的工作方式与 threading.local
略有不同。但是 Peewee 的其余内部代码假设此类与 threading.local
一起工作。
因此,我们需要做一些额外的技巧来使其工作,就像它只是使用 threading.local
一样。__init__
、__setattr__
和 __getattr__
实现所有必要的技巧,以便 Peewee 在不知道它现在与 FastAPI 兼容的情况下使用它。
提示
这只会使 Peewee 在与 FastAPI 一起使用时表现正确。不会随机打开或关闭正在使用的连接,从而导致错误等。
但它并没有赋予 Peewee 异步超能力。您仍然应该使用普通的 def
函数而不是 async def
。
使用自定义的 PeeweeConnectionState
类¶
现在,使用新的 PeeweeConnectionState
覆盖 Peewee 数据库 db
对象中的 ._state
内部属性
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
提示
确保在创建 db
*之后*覆盖 db._state
。
提示
对于任何其他 Peewee 数据库,包括 PostgresqlDatabase
、MySQLDatabase
等,您都会执行相同的操作。
创建数据库模型¶
现在让我们看看文件 sql_app/models.py
。
为我们的数据创建 Peewee 模型¶
现在为 User
和 Item
创建 Peewee 模型(类)。
如果您遵循 Peewee 教程并将模型更新为与 SQLAlchemy 教程中的数据相同,则会执行相同的操作。
提示
Peewee 还使用术语“模型”来指代这些与数据库交互的类和实例。
但是 Pydantic 也使用术语“模型”来指代不同的东西,即数据验证、转换和文档类和实例。
从 database
中导入 db
(上面提到的 database.py
文件)并在此处使用它。
import peewee
from .database import db
class User(peewee.Model):
email = peewee.CharField(unique=True, index=True)
hashed_password = peewee.CharField()
is_active = peewee.BooleanField(default=True)
class Meta:
database = db
class Item(peewee.Model):
title = peewee.CharField(index=True)
description = peewee.CharField(index=True)
owner = peewee.ForeignKeyField(User, backref="items")
class Meta:
database = db
提示
Peewee 创建了几个魔法属性。
它将自动添加一个 id
属性作为整数,作为主键。
它将根据类名选择表名。
对于 Item
,它将创建一个具有 User
的整数 ID 的属性 owner_id
。但我们没有在任何地方声明它。
创建 Pydantic 模型¶
现在让我们检查文件 sql_app/schemas.py
。
提示
为了避免 Peewee 模型和 Pydantic 模型之间的混淆,我们将使用包含 Peewee 模型的文件 models.py
,以及包含 Pydantic 模型的文件 schemas.py
。
这些 Pydantic 模型或多或少定义了一个“模式”(有效的數據形狀)。
因此,这将有助于我们在使用两者时避免混淆。
创建 Pydantic 模型/模式¶
创建与 SQLAlchemy 教程中所有相同的 Pydantic 模型
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
提示
在这里,我们正在创建带有 id
的模型。
我们没有在 Peewee 模型中显式指定 id
属性,但 Peewee 会自动添加一个。
我们还将魔法 owner_id
属性添加到 Item
中。
为 Pydantic 模型/模式创建 PeeweeGetterDict
¶
当您访问 Peewee 对象中的关系时,例如在 some_user.items
中,Peewee 不会提供 Item
的 list
。
它提供了一个名为 ModelSelect
的类的特殊自定义对象。
可以使用 list(some_user.items)
创建其项目的 list
。
但对象本身不是 list
。它也不是实际的 Python 生成器。因此,Pydantic 默认不知道如何将其转换为 Pydantic 模型/模式的 list
。
但是 Pydantic 的最新版本允许提供一个继承自 pydantic.utils.GetterDict
的自定义类,以提供在使用 orm_mode = True
检索 ORM 模型属性的值时使用的功能。
我们将创建一个自定义的 PeeweeGetterDict
类,并在所有使用 orm_mode
的相同 Pydantic 模型/模式中使用它
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
在这里,我们正在检查正在访问的属性(例如 some_user.items
中的 .items
)是否是 peewee.ModelSelect
的实例。
如果是这种情况,只需返回一个包含它的 list
。
然后,我们在使用 orm_mode = True
的 Pydantic 模型/模式中使用它,配置变量为 getter_dict = PeeweeGetterDict
。
提示
我们只需要创建一个 PeeweeGetterDict
类,就可以在所有 Pydantic 模型/模式中使用它。
CRUD 工具¶
现在让我们看看文件 sql_app/crud.py
。
创建所有 CRUD 工具¶
创建与 SQLAlchemy 教程中所有相同的 CRUD 工具,所有代码都非常相似
from . import models, schemas
def get_user(user_id: int):
return models.User.filter(models.User.id == user_id).first()
def get_user_by_email(email: str):
return models.User.filter(models.User.email == email).first()
def get_users(skip: int = 0, limit: int = 100):
return list(models.User.select().offset(skip).limit(limit))
def create_user(user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db_user.save()
return db_user
def get_items(skip: int = 0, limit: int = 100):
return list(models.Item.select().offset(skip).limit(limit))
def create_user_item(item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db_item.save()
return db_item
与 SQLAlchemy 教程的代码有一些区别。
我们没有传递 db
属性。相反,我们直接使用模型。这是因为 db
对象是一个全局对象,包含所有连接逻辑。这就是我们必须在上面进行所有 contextvars
更新的原因。
此外,在返回多个对象时,例如在 get_users
中,我们直接调用 list
,如下所示:
list(models.User.select())
这是因为我们必须创建自定义 PeeweeGetterDict
的原因相同。但是,通过返回已经是 list
的内容而不是 peewee.ModelSelect
,路径操作中带有 List[models.User]
的 response_model
(我们稍后会看到)将正常工作。
主要 FastAPI 应用¶
现在,在文件 sql_app/main.py
中,让我们集成并使用我们之前创建的所有其他部分。
创建数据库表¶
以非常简单的方式创建数据库表
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
创建依赖项¶
创建一个依赖项,它将在请求开始时连接数据库,并在请求结束时断开连接
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
这里我们有一个空的 yield
,因为我们实际上没有直接使用数据库对象。
它连接到数据库并将连接数据存储在一个内部变量中,该变量对于每个请求都是独立的(使用上面的 contextvars
技巧)。
由于数据库连接可能阻塞 I/O,因此此依赖项是使用普通的 def
函数创建的。
然后,在每个需要访问数据库的路径操作函数中,我们将其添加为依赖项。
但我们没有使用此依赖项给出的值(它实际上没有给出任何值,因为它有一个空的 yield
)。因此,我们不将其添加到路径操作函数中,而是将其添加到 dependencies
参数中的路径操作装饰器中
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
上下文变量子依赖项¶
为了使所有 contextvars
部分正常工作,我们需要确保每个使用数据库的请求在 ContextVar
中都有一个独立的值,并且该值将用作整个请求的数据库状态(连接、事务等)。
为此,我们需要创建另一个 async
依赖项 reset_db_state()
,它用作 get_db()
中的子依赖项。它将为上下文变量设置一个值(仅使用默认的 dict
),该值将用作整个请求的数据库状态。然后,依赖项 get_db()
将在其中存储数据库状态(连接、事务等)。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
对于下一个请求,由于我们将在 async
依赖项 reset_db_state()
中再次重置该上下文变量,然后在 get_db()
依赖项中创建一个新连接,因此该新请求将拥有自己的数据库状态(连接、事务等)。
提示
由于 FastAPI 是一个异步框架,因此一个请求可能会开始处理,在完成之前,可能会收到另一个请求并开始处理,并且所有这些都可能在同一个线程中处理。
但是上下文变量知道这些异步特性,因此,在async
依赖项reset_db_state()
中设置的Peewee数据库状态将在整个请求过程中保持其自身数据。
同时,另一个并发请求将拥有自己的数据库状态,该状态在整个请求中将是独立的。
Peewee代理¶
如果您正在使用Peewee代理,则实际数据库位于db.obj
中。
因此,您可以使用以下方法重置它:
async def reset_db_state():
database.db.obj._state._state.set(db_state_default.copy())
database.db.obj._state.reset()
创建您的FastAPI路径操作¶
现在,最后,这是标准的FastAPI路径操作代码。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
关于def
与async def
¶
与SQLAlchemy一样,我们不会执行以下操作:
user = await models.User.select().first()
…而是使用以下方法:
user = models.User.select().first()
因此,同样,我们应该像这样声明路径操作函数和依赖项,不使用async def
,只使用普通的def
:
# Something goes here
def read_users(skip: int = 0, limit: int = 100):
# Something goes here
使用async测试Peewee¶
此示例包含一个额外的路径操作,使用time.sleep(sleep_time)
模拟长时间处理请求。
它将在开始时打开数据库连接,并在回复之前等待几秒钟。每个新请求将等待少一秒。
这将使您能够轻松测试您的Peewee和FastAPI应用程序在所有线程相关方面是否正常运行。
如果您想检查Peewee在未经修改的情况下使用时如何破坏您的应用程序,请转到sql_app/database.py
文件并注释以下行:
# db._state = PeeweeConnectionState()
在sql_app/main.py
文件中,注释async
依赖项reset_db_state()
的主体,并用pass
替换它。
async def reset_db_state():
# database.db._state._state.set(db_state_default.copy())
# database.db._state.reset()
pass
然后使用Uvicorn运行您的应用程序。
$ uvicorn sql_app.main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
在浏览器中打开http://127.0.0.1:8000/docs并创建几个用户。
然后在http://127.0.0.1:8000/docs#/default/read_slow_users_slowusers__get同时打开10个标签页。
转到所有标签页中的路径操作“获取/slowusers/
”。使用“试用”按钮并在每个标签页中依次执行请求。
这些标签页将等待一段时间,然后其中一些将显示内部服务器错误
。
发生了什么¶
第一个标签页将使您的应用程序创建到数据库的连接,并在回复并关闭数据库连接之前等待几秒钟。
然后,对于下一个标签页中的请求,您的应用程序将等待少一秒,依此类推。
这意味着它最终将比某些先前的请求更早地完成某些最后一个标签页的请求。
然后,等待时间较短的最后一个请求之一将尝试打开数据库连接,但是由于其他标签页的先前请求之一可能与第一个请求在同一个线程中处理,因此它将拥有同一个已经打开的数据库连接,Peewee 将抛出一个错误,您将在终端中看到它,并且响应将包含一个内部服务器错误
。
这可能会发生在多个标签页中。
如果有多个客户端在完全相同的时间与您的应用程序通信,则可能会发生这种情况。
并且随着您的应用程序开始同时处理越来越多的客户端,单个请求中的等待时间需要越来越短才能触发错误。
修复FastAPI中的Peewee¶
现在返回到sql_app/database.py
文件,并取消注释以下行:
db._state = PeeweeConnectionState()
在sql_app/main.py
文件中,取消注释async
依赖项reset_db_state()
的主体。
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
终止正在运行的应用程序并重新启动它。
使用10个标签页重复相同的过程。这次,所有标签页都将等待,并且您将获得所有结果而不会出现错误。
…您修复了它!
查看所有文件¶
请记住,您应该有一个名为my_super_project
(或您想要的任何名称)的目录,其中包含一个名为sql_app
的子目录。
sql_app
应该包含以下文件:
-
sql_app/__init__.py
:是一个空文件。 -
sql_app/database.py
:
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
sql_app/models.py
:
import peewee
from .database import db
class User(peewee.Model):
email = peewee.CharField(unique=True, index=True)
hashed_password = peewee.CharField()
is_active = peewee.BooleanField(default=True)
class Meta:
database = db
class Item(peewee.Model):
title = peewee.CharField(index=True)
description = peewee.CharField(index=True)
owner = peewee.ForeignKeyField(User, backref="items")
class Meta:
database = db
sql_app/schemas.py
:
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
sql_app/crud.py
:
from . import models, schemas
def get_user(user_id: int):
return models.User.filter(models.User.id == user_id).first()
def get_user_by_email(email: str):
return models.User.filter(models.User.email == email).first()
def get_users(skip: int = 0, limit: int = 100):
return list(models.User.select().offset(skip).limit(limit))
def create_user(user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db_user.save()
return db_user
def get_items(skip: int = 0, limit: int = 100):
return list(models.Item.select().offset(skip).limit(limit))
def create_user_item(item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db_item.save()
return db_item
sql_app/main.py
:
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
技术细节¶
警告
这些是非常技术性的细节,您可能不需要了解。
问题¶
Peewee默认使用threading.local
来存储其数据库“状态”数据(连接、事务等)。
threading.local
为当前线程创建了一个排他值,但是异步框架将在同一个线程中运行所有代码(例如,对于每个请求),并且可能不是按顺序运行。
最重要的是,异步框架可以在线程池中运行一些同步代码(使用asyncio.run_in_executor
),但属于同一个请求。
这意味着,使用Peewee的当前实现,多个任务可能正在使用同一个threading.local
变量,并最终共享同一个连接和数据(它们不应该共享),并且同时,如果它们在线程池中执行同步I/O阻塞代码(如FastAPI中的普通def
函数,在路径操作和依赖项中),则该代码将无法访问数据库状态变量,即使它属于同一个请求并且应该能够访问同一个数据库状态。
上下文变量¶
Python 3.7具有contextvars
,它可以创建一个与threading.local
非常类似的局部变量,但也支持这些异步特性。
需要牢记一些事项。
ContextVar
必须在模块顶部创建,例如:
some_var = ContextVar("some_var", default="default value")
要设置在当前“上下文”(例如,对于当前请求)中使用的值,请使用:
some_var.set("new value")
要在上下文的任何位置(例如,在处理当前请求的任何部分)获取值,请使用:
some_var.get()
在async
依赖项reset_db_state()
中设置上下文变量¶
如果异步代码的某一部分使用some_var.set("updated in function")
设置值(例如,像async
依赖项一样),则其中的其余代码以及后续的代码(包括使用await
调用的async
函数内部的代码)将看到该新值。
因此,在我们的例子中,如果我们在async
依赖项中设置Peewee状态变量(使用默认的dict
),则应用程序中的所有其他内部代码将看到此值,并能够在整个请求中重复使用它。
并且对于下一个请求,即使它们是并发的,上下文变量也将被重新设置。
在依赖项get_db()
中设置数据库状态¶
由于get_db()
是一个普通的def
函数,FastAPI将在线程池中运行它,并使用“上下文”的副本,该副本持有上下文变量(包含重置数据库状态的dict
)的相同值。然后它可以向该dict
添加数据库状态,例如连接等。
但是,如果上下文变量(默认的dict
)的值是在该普通的def
函数中设置的,它将创建一个新的值,该值将只保留在该线程池的线程中,其余代码(如路径操作函数)将无法访问它。在get_db()
中,我们只能在dict
中设置值,而不能设置整个dict
本身。
因此,我们需要使用async
依赖项reset_db_state()
在上下文变量中设置dict
。这样,所有代码都可以访问单个请求的数据库状态的同一个dict
。
在依赖项get_db()
中连接和断开连接¶
那么下一个问题是,为什么不直接在async
依赖项本身中连接和断开数据库连接,而是在get_db()
中连接和断开连接呢?
async
依赖项必须是async
,以便上下文变量在请求的其余部分中保留,但是创建和关闭数据库连接可能会阻塞,因此如果它位于此处,可能会降低性能。
所以我们也需要普通的def
依赖项get_db()
。