生命周期事件¶
你可以定义应在应用程序**启动前**执行的逻辑(代码)。这意味着此代码将在应用程序**开始接收请求之前**,**只执行一次**。
同样,你可以定义应在应用程序**关闭时**执行的逻辑(代码)。在这种情况下,此代码将在处理了可能**许多请求之后**,**只执行一次**。
由于此代码在应用程序**开始**接收请求之前执行,并在应用程序**完成**处理请求之后立即执行,它涵盖了整个应用程序的**生命周期**(“生命周期”这个词在稍后会很重要😉)。
这对于设置你需要用于整个应用程序的**资源**非常有用,这些资源在请求之间是**共享的**,并且/或者你需要在之后**清理**它们。例如,数据库连接池,或者加载共享的机器学习模型。
用例¶
让我们从一个示例**用例**开始,然后看看如何用这个解决它。
假设你有一些**机器学习模型**,你想用它们来处理请求。🤖
相同的模型在请求之间共享,所以不是每个请求一个模型,或者每个用户一个模型,或者类似的情况。
假设加载模型可能**需要相当长的时间**,因为它必须从**磁盘读取大量数据**。所以你不想为每个请求都这样做。
你可以在模块/文件的顶层加载它,但这也会意味着即使你只是运行一个简单的自动化测试,它也会**加载模型**,那么这个测试就会**很慢**,因为它必须等待模型加载,然后才能运行代码的独立部分。
这就是我们要解决的问题,让我们在处理请求之前加载模型,但只在应用程序开始接收请求之前加载,而不是在代码加载时加载。
生命周期¶
你可以使用 FastAPI 应用的 lifespan 参数和“上下文管理器”(我稍后会告诉你那是什么)来定义这种*启动*和*关闭*逻辑。
让我们从一个例子开始,然后详细了解它。
我们像这样用 yield 创建一个异步函数 lifespan()
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
在这里,我们通过在 yield 之前将(假的)模型函数放在机器学习模型字典中,模拟加载模型这种开销大的*启动*操作。这段代码将在应用程序**开始接收请求之前**,在*启动*期间执行。
然后,在 yield 之后,我们卸载模型。这段代码将在应用程序**完成处理请求之后**,在*关闭*之前执行。这可以,例如,释放内存或 GPU 等资源。
提示
当你在**停止**应用程序时,就会发生 shutdown。
也许你需要启动一个新版本,或者你只是厌倦了运行它。🤷
生命周期函数¶
首先要注意的是,我们正在定义一个带有 yield 的异步函数。这与带有 yield 的依赖项非常相似。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
函数的第一部分,即 yield 之前的部分,将在应用程序启动**之前**执行。
而 yield 之后的部分,将在应用程序完成**之后**执行。
异步上下文管理器¶
如果你查看,该函数用 @asynccontextmanager 装饰。
这会将函数转换为一个名为“**异步上下文管理器**”的东西。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
Python 中的**上下文管理器**是你可以在 with 语句中使用的东西,例如,open() 可以用作上下文管理器
with open("file.txt") as file:
file.read()
在 Python 的最新版本中,还有一个**异步上下文管理器**。你会用 async with 来使用它
async with lifespan(app):
await do_stuff()
当你像上面那样创建一个上下文管理器或异步上下文管理器时,它会在进入 with 块之前执行 yield 之前的代码,并在退出 with 块之后执行 yield 之后的代码。
在我们的代码示例中,我们没有直接使用它,而是将其传递给 FastAPI 来使用。
FastAPI 应用的 lifespan 参数接受一个**异步上下文管理器**,所以我们可以将我们新的 lifespan 异步上下文管理器传递给它。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
替代事件(已弃用)¶
警告
处理*启动*和*关闭*的推荐方法是使用上述 FastAPI 应用的 lifespan 参数。如果你提供了 lifespan 参数,startup 和 shutdown 事件处理程序将不再被调用。要么全部使用 lifespan,要么全部使用事件,不能两者都用。
你可能可以跳过这一部分。
还有另一种方法可以定义在*启动*期间和*关闭*期间执行的逻辑。
你可以定义需要在应用程序启动之前或应用程序关闭时执行的事件处理程序(函数)。
这些函数可以用 async def 或普通的 def 声明。
startup 事件¶
要添加一个应在应用程序启动前运行的函数,请使用事件 "startup" 声明它
from fastapi import FastAPI
app = FastAPI()
items = {}
@app.on_event("startup")
async def startup_event():
items["foo"] = {"name": "Fighters"}
items["bar"] = {"name": "Tenders"}
@app.get("/items/{item_id}")
async def read_items(item_id: str):
return items[item_id]
在这种情况下,startup 事件处理程序函数将使用一些值初始化项目的“数据库”(只是一个 dict)。
你可以添加不止一个事件处理函数。
并且你的应用程序在所有 startup 事件处理程序完成之前不会开始接收请求。
shutdown 事件¶
要添加一个应在应用程序关闭时运行的函数,请使用事件 "shutdown" 声明它
from fastapi import FastAPI
app = FastAPI()
@app.on_event("shutdown")
def shutdown_event():
with open("log.txt", mode="a") as log:
log.write("Application shutdown")
@app.get("/items/")
async def read_items():
return [{"name": "Foo"}]
在这里,shutdown 事件处理函数将把一行文本 "Application shutdown" 写入文件 log.txt。
信息
在 open() 函数中,mode="a" 表示“追加”,因此,该行将添加到该文件中的任何内容之后,而不会覆盖以前的内容。
提示
请注意,在这种情况下,我们使用的是与文件交互的标准 Python open() 函数。
因此,它涉及 I/O(输入/输出),这需要“等待”数据写入磁盘。
但 open() 不使用 async 和 await。
因此,我们使用标准的 def 而不是 async def 声明事件处理函数。
startup 和 shutdown 一起使用¶
你的*启动*和*关闭*逻辑很可能相互关联,你可能想启动一些东西然后完成它,获取一个资源然后释放它,等等。
在不共享逻辑或变量的独立函数中执行此操作会更困难,因为你需要将值存储在全局变量或类似技巧中。
因此,现在建议改为使用如上所述的 lifespan。
技术细节¶
只是好奇的技术细节。🤓
在 ASGI 技术规范中,这是 生命周期协议 的一部分,它定义了名为 startup 和 shutdown 的事件。
子应用¶
🚨 请记住,这些生命周期事件(启动和关闭)将只为主应用程序执行,而不会为子应用程序 - 挂载执行。