使用 Encode/Databases 的异步 SQL(关系型)数据库(已弃用)¶
"已弃用"
本教程已弃用,将在未来版本中删除。
您也可以使用 encode/databases
与 FastAPI 连接到数据库,使用 async
和 await
。
它与以下内容兼容
- PostgreSQL
- MySQL
- SQLite
在本示例中,我们将使用 SQLite,因为它使用单个文件,并且 Python 具有集成支持。因此,您可以复制此示例并按原样运行。
稍后,对于您的生产应用程序,您可能希望使用像 PostgreSQL 这样的数据库服务器。
提示
您可以从关于 SQLAlchemy ORM 的部分中借鉴想法 (SQL(关系型)数据库),例如使用实用程序函数在数据库中执行操作,独立于您的 FastAPI 代码。
本部分不应用这些想法,以等同于 Starlette 中的对应部分。
导入并设置 SQLAlchemy
¶
- 导入
SQLAlchemy
。 - 创建一个
metadata
对象。 - 使用
metadata
对象创建一个表notes
。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
提示
注意,所有这些代码都是纯 SQLAlchemy Core。
databases
现在还没有执行任何操作。
导入并设置 databases
¶
- 导入
databases
。 - 创建一个
DATABASE_URL
。 - 创建一个
database
对象。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
提示
如果您要连接到其他数据库(例如 PostgreSQL),则需要更改 DATABASE_URL
。
创建表¶
在本例中,我们在同一个 Python 文件中创建表,但在生产环境中,您可能希望使用 Alembic、集成迁移等创建它们。
在这里,本节将直接运行,在启动您的 FastAPI 应用程序之前。
- 创建一个
engine
。 - 从
metadata
对象中创建所有表。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
创建模型¶
为以下内容创建 Pydantic 模型
- 要创建的笔记 (
NoteIn
)。 - 要返回的笔记 (
Note
)。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
通过创建这些 Pydantic 模型,输入数据将被验证、序列化(转换)和注释(记录)。
因此,您将能够在交互式 API 文档中看到所有这些内容。
连接和断开连接¶
- 创建您的
FastAPI
应用程序。 - 创建事件处理程序以连接和断开数据库连接。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
读取笔记¶
创建路径操作函数以读取笔记
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
注意
请注意,由于我们使用 await
与数据库通信,因此路径操作函数 使用 async
声明。
注意 response_model=List[Note]
¶
它使用 typing.List
。
该文档(以及验证、序列化、过滤)输出数据,作为Note
的list
。
创建笔记¶
创建用于创建笔记的路径操作函数
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
信息
在 Pydantic v1 中,该方法被称为.dict()
,它在 Pydantic v2 中已弃用(但仍受支持),并重命名为.model_dump()
。
这里示例使用.dict()
,以保持与 Pydantic v1 的兼容性,但如果您能够使用 Pydantic v2,则应使用.model_dump()
。
注意
请注意,由于我们使用 await
与数据库通信,因此路径操作函数 使用 async
声明。
关于{**note.dict(), "id": last_record_id}
¶
note
是一个 Pydantic Note
对象。
note.dict()
返回一个包含其数据的dict
,类似于
{
"text": "Some note",
"completed": False,
}
但它没有id
字段。
因此,我们创建一个新的dict
,其中包含来自note.dict()
的键值对,并
{**note.dict()}
**note.dict()
直接“解包”键值对,因此,{**note.dict()}
基本上是note.dict()
的副本。
然后,我们扩展该副本dict
,添加另一个键值对:"id": last_record_id
{**note.dict(), "id": last_record_id}
因此,最终返回的结果将类似于
{
"id": 1,
"text": "Some note",
"completed": False,
}
检查它¶
您可以将此代码原样复制,并在http://127.0.0.1:8000/docs查看文档。
在那里,您可以查看所有 API 文档并与之交互
更多信息¶
您可以在encode/databases
的 GitHub 页面上了解更多信息。