生命周期事件¶
你可以定义在应用程序 **启动** 之前应该执行的逻辑(代码)。这意味着这段代码将 **执行一次**,在应用程序 **开始接收请求** **之前** 执行。
同样,你也可以定义在应用程序 **关闭** 时应该执行的逻辑(代码)。在这种情况下,这段代码将 **执行一次**,在可能处理完 **多个请求** **之后** 执行。
因为这段代码在应用程序 **开始** 处理请求之前执行,并且在它 **完成** 处理请求之后立即执行,所以它涵盖了整个应用程序 **生命周期**(“生命周期” 这个词在后面会很重要 😉)。
这对于设置你整个应用程序需要使用的 **资源** 非常有用,这些资源在请求之间是 **共享** 的,或者你之后需要 **清理** 它们。例如,数据库连接池,或者加载共享的机器学习模型。
用例¶
让我们从一个示例 **用例** 开始,然后看看如何用它来解决。
假设你有一些 **机器学习模型**,你想用它们来处理请求。🤖
相同的模型在请求之间共享,因此不是每个请求一个模型,也不是每个用户一个模型,或者类似的东西。
假设加载模型可能 **需要相当长的时间**,因为它需要从 **磁盘** 中读取大量 **数据**。因此,你不想在每个请求时都这样做。
你可以在模块/文件的顶层加载它,但这也会意味着即使你只是运行一个简单的自动化测试,它也会 **加载模型**,那么该测试将 **很慢**,因为它必须等待模型加载才能运行代码的独立部分。
这就是我们将要解决的问题,让我们在处理请求之前加载模型,但只在应用程序开始接收请求之前加载,而不是在代码加载时加载。
生命周期¶
你可以使用 FastAPI
应用程序的 lifespan
参数和“上下文管理器”来定义这种 *启动* 和 *关闭* 逻辑(我将在下面展示什么是上下文管理器)。
让我们从一个示例开始,然后详细了解它。
我们用 yield
创建一个异步函数 lifespan()
,如下所示
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
在这里,我们通过将 (模拟的) 模型函数放在带有机器学习模型的字典中,在 yield
之前模拟加载模型的昂贵 *启动* 操作。这段代码将在应用程序 **开始处理请求** **之前** 执行,在 *启动* 期间。
然后,紧接在 yield
之后,我们卸载模型。这段代码将在应用程序 **完成处理请求** **之后** 执行,在 *关闭* 之前。例如,这可以释放资源,如内存或 GPU。
提示
shutdown
将在你在 **停止** 应用程序时发生。
也许你需要启动一个新版本,或者你只是厌倦了运行它。🤷
生命周期函数¶
首先要注意的是,我们使用 yield
定义了一个异步函数。这与带有 yield
的依赖项非常相似。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
函数的第一部分,在 yield
之前,将在应用程序启动 **之前** 执行。
yield
之后的代码将在应用程序完成 **之后** 执行。
异步上下文管理器¶
如果你查看,这个函数是用 @asynccontextmanager
装饰的。
它将函数转换为一个叫做 "**异步上下文管理器**" 的东西。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
Python 中的 **上下文管理器** 是你可以在 with
语句中使用的东西,例如,open()
可以用作上下文管理器
with open("file.txt") as file:
file.read()
在 Python 的最近版本中,也存在 **异步上下文管理器**。 你可以用 async with
来使用它
async with lifespan(app):
await do_stuff()
当你创建上下文管理器或异步上下文管理器时,它所做的是,在进入 with
代码块之前,它会执行 yield
之前的代码,并在退出 with
代码块之后,它会执行 yield
之后的代码。
在我们上面的代码示例中,我们没有直接使用它,而是将它传递给 FastAPI 以供它使用。
FastAPI
应用程序的 lifespan
参数接受一个 **异步上下文管理器**,因此我们可以将我们新的 lifespan
异步上下文管理器传递给它。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
替代事件(已弃用)¶
警告
处理启动 和 关闭 的推荐方法是使用上面描述的 FastAPI
应用程序的 lifespan
参数。 如果你提供 lifespan
参数,startup
和 shutdown
事件处理程序将不再被调用。 它是全部 lifespan
或全部事件,而不是两者兼而有之。
你可能可以跳过这部分。
有一种替代方法来定义此逻辑,以便在启动 和关闭 期间执行。
你可以定义事件处理程序(函数),这些函数需要在应用程序启动之前或应用程序关闭时执行。
这些函数可以用 async def
或普通的 def
声明。
startup
事件¶
要添加一个应该在应用程序启动之前运行的函数,用事件 "startup"
声明它
from fastapi import FastAPI
app = FastAPI()
items = {}
@app.on_event("startup")
async def startup_event():
items["foo"] = {"name": "Fighters"}
items["bar"] = {"name": "Tenders"}
@app.get("/items/{item_id}")
async def read_items(item_id: str):
return items[item_id]
在这种情况下,startup
事件处理程序函数将用一些值初始化项目 "database"(只是一个 dict
)。
你可以添加多个事件处理程序函数。
你的应用程序只有在所有 startup
事件处理程序完成之后才会开始接收请求。
shutdown
事件¶
要添加一个应该在应用程序关闭时运行的函数,用事件 "shutdown"
声明它
from fastapi import FastAPI
app = FastAPI()
@app.on_event("shutdown")
def shutdown_event():
with open("log.txt", mode="a") as log:
log.write("Application shutdown")
@app.get("/items/")
async def read_items():
return [{"name": "Foo"}]
在这里,shutdown
事件处理程序函数将写入文本行 "Application shutdown"
到文件 log.txt
。
信息
在 open()
函数中,mode="a"
表示 "追加",因此,该行将在该文件中现有的内容之后添加,不会覆盖以前的内容。
提示
请注意,在这种情况下,我们使用的是标准的 Python open()
函数,它与文件交互。
因此,它涉及 I/O(输入/输出),这需要 "等待" 将内容写入磁盘。
但是 open()
不使用 async
和 await
。
因此,我们用标准的 def
而不是 async def
声明事件处理程序函数。
startup
和 shutdown
结合使用¶
你很可能需要将你的启动 和关闭 逻辑连接起来,你可能需要启动某些东西然后完成它,获取资源然后释放它,等等。
在不共享逻辑或变量的单独函数中执行此操作会更加困难,因为你将需要将值存储在全局变量中或使用类似的技巧。
因此,现在建议改为使用上面解释的 lifespan
。
技术细节¶
只是一些技术细节,供好奇的极客们参考。🤓
在 ASGI 技术规范下,这是 生命周期协议 的一部分,它定义了称为 startup
和 shutdown
的事件。
子应用程序¶
🚨 请记住,这些生命周期事件(启动和关闭)只对主应用程序执行,不对 子应用程序 - 挂载 执行。