跳到内容

生命周期事件

你可以定义在应用程序 **启动** 之前应该执行的逻辑(代码)。这意味着这段代码将 **执行一次**,在应用程序 **开始接收请求** **之前** 执行。

同样,你也可以定义在应用程序 **关闭** 时应该执行的逻辑(代码)。在这种情况下,这段代码将 **执行一次**,在可能处理完 **多个请求** **之后** 执行。

因为这段代码在应用程序 **开始** 处理请求之前执行,并且在它 **完成** 处理请求之后立即执行,所以它涵盖了整个应用程序 **生命周期**(“生命周期” 这个词在后面会很重要 😉)。

这对于设置你整个应用程序需要使用的 **资源** 非常有用,这些资源在请求之间是 **共享** 的,或者你之后需要 **清理** 它们。例如,数据库连接池,或者加载共享的机器学习模型。

用例

让我们从一个示例 **用例** 开始,然后看看如何用它来解决。

假设你有一些 **机器学习模型**,你想用它们来处理请求。🤖

相同的模型在请求之间共享,因此不是每个请求一个模型,也不是每个用户一个模型,或者类似的东西。

假设加载模型可能 **需要相当长的时间**,因为它需要从 **磁盘** 中读取大量 **数据**。因此,你不想在每个请求时都这样做。

你可以在模块/文件的顶层加载它,但这也会意味着即使你只是运行一个简单的自动化测试,它也会 **加载模型**,那么该测试将 **很慢**,因为它必须等待模型加载才能运行代码的独立部分。

这就是我们将要解决的问题,让我们在处理请求之前加载模型,但只在应用程序开始接收请求之前加载,而不是在代码加载时加载。

生命周期

你可以使用 FastAPI 应用程序的 lifespan 参数和“上下文管理器”来定义这种 *启动* 和 *关闭* 逻辑(我将在下面展示什么是上下文管理器)。

让我们从一个示例开始,然后详细了解它。

我们用 yield 创建一个异步函数 lifespan(),如下所示

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

在这里,我们通过将 (模拟的) 模型函数放在带有机器学习模型的字典中,在 yield 之前模拟加载模型的昂贵 *启动* 操作。这段代码将在应用程序 **开始处理请求** **之前** 执行,在 *启动* 期间。

然后,紧接在 yield 之后,我们卸载模型。这段代码将在应用程序 **完成处理请求** **之后** 执行,在 *关闭* 之前。例如,这可以释放资源,如内存或 GPU。

提示

shutdown 将在你在 **停止** 应用程序时发生。

也许你需要启动一个新版本,或者你只是厌倦了运行它。🤷

生命周期函数

首先要注意的是,我们使用 yield 定义了一个异步函数。这与带有 yield 的依赖项非常相似。

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

函数的第一部分,在 yield 之前,将在应用程序启动 **之前** 执行。

yield 之后的代码将在应用程序完成 **之后** 执行。

异步上下文管理器

如果你查看,这个函数是用 @asynccontextmanager 装饰的。

它将函数转换为一个叫做 "**异步上下文管理器**" 的东西。

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

Python 中的 **上下文管理器** 是你可以在 with 语句中使用的东西,例如,open() 可以用作上下文管理器

with open("file.txt") as file:
    file.read()

在 Python 的最近版本中,也存在 **异步上下文管理器**。 你可以用 async with 来使用它

async with lifespan(app):
    await do_stuff()

当你创建上下文管理器或异步上下文管理器时,它所做的是,在进入 with 代码块之前,它会执行 yield 之前的代码,并在退出 with 代码块之后,它会执行 yield 之后的代码。

在我们上面的代码示例中,我们没有直接使用它,而是将它传递给 FastAPI 以供它使用。

FastAPI 应用程序的 lifespan 参数接受一个 **异步上下文管理器**,因此我们可以将我们新的 lifespan 异步上下文管理器传递给它。

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

替代事件(已弃用)

警告

处理启动关闭 的推荐方法是使用上面描述的 FastAPI 应用程序的 lifespan 参数。 如果你提供 lifespan 参数,startupshutdown 事件处理程序将不再被调用。 它是全部 lifespan 或全部事件,而不是两者兼而有之。

你可能可以跳过这部分。

有一种替代方法来定义此逻辑,以便在启动关闭 期间执行。

你可以定义事件处理程序(函数),这些函数需要在应用程序启动之前或应用程序关闭时执行。

这些函数可以用 async def 或普通的 def 声明。

startup 事件

要添加一个应该在应用程序启动之前运行的函数,用事件 "startup" 声明它

from fastapi import FastAPI

app = FastAPI()

items = {}


@app.on_event("startup")
async def startup_event():
    items["foo"] = {"name": "Fighters"}
    items["bar"] = {"name": "Tenders"}


@app.get("/items/{item_id}")
async def read_items(item_id: str):
    return items[item_id]

在这种情况下,startup 事件处理程序函数将用一些值初始化项目 "database"(只是一个 dict)。

你可以添加多个事件处理程序函数。

你的应用程序只有在所有 startup 事件处理程序完成之后才会开始接收请求。

shutdown 事件

要添加一个应该在应用程序关闭时运行的函数,用事件 "shutdown" 声明它

from fastapi import FastAPI

app = FastAPI()


@app.on_event("shutdown")
def shutdown_event():
    with open("log.txt", mode="a") as log:
        log.write("Application shutdown")


@app.get("/items/")
async def read_items():
    return [{"name": "Foo"}]

在这里,shutdown 事件处理程序函数将写入文本行 "Application shutdown" 到文件 log.txt

信息

open() 函数中,mode="a" 表示 "追加",因此,该行将在该文件中现有的内容之后添加,不会覆盖以前的内容。

提示

请注意,在这种情况下,我们使用的是标准的 Python open() 函数,它与文件交互。

因此,它涉及 I/O(输入/输出),这需要 "等待" 将内容写入磁盘。

但是 open() 不使用 asyncawait

因此,我们用标准的 def 而不是 async def 声明事件处理程序函数。

startupshutdown 结合使用

你很可能需要将你的启动关闭 逻辑连接起来,你可能需要启动某些东西然后完成它,获取资源然后释放它,等等。

在不共享逻辑或变量的单独函数中执行此操作会更加困难,因为你将需要将值存储在全局变量中或使用类似的技巧。

因此,现在建议改为使用上面解释的 lifespan

技术细节

只是一些技术细节,供好奇的极客们参考。🤓

在 ASGI 技术规范下,这是 生命周期协议 的一部分,它定义了称为 startupshutdown 的事件。

信息

你可以在 Starlette 的生命周期文档 中阅读更多关于 Starlette lifespan 处理程序的信息。

包括如何处理可以在代码的其他地方使用的生命周期状态。

子应用程序

🚨 请记住,这些生命周期事件(启动和关闭)只对主应用程序执行,不对 子应用程序 - 挂载 执行。