跳到内容

后台任务

您可以定义后台任务,使其在返回响应后运行。

这对于需要在请求完成后执行,但客户端不必等待操作完成即可接收响应的操作很有用。

例如,这包括

  • 执行操作后发送的电子邮件通知
    • 由于连接到邮件服务器并发送邮件可能很“慢”(需要几秒钟),因此您可以立即返回响应,并在后台发送电子邮件通知。
  • 处理数据
    • 例如,假设您收到一个文件,该文件必须经过一个缓慢的处理过程,您可以返回一个“已接受”(HTTP 202)的响应,并在后台处理该文件。

使用 BackgroundTasks

首先,导入 BackgroundTasks 并在您的路径操作函数中定义一个参数,其类型声明为 BackgroundTasks

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPI 将为您创建 BackgroundTasks 类型的对象,并将其作为该参数传递。

创建任务函数

创建一个函数作为后台任务运行。

它只是一个可以接收参数的标准函数。

它可以是 async def 或普通 def 函数,FastAPI 将知道如何正确处理它。

在这种情况下,任务函数将写入文件(模拟发送电子邮件)。

由于写入操作不使用 asyncawait,因此我们使用普通 def 定义函数

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

添加后台任务

在您的路径操作函数内部,通过 .add_task() 方法将您的任务函数传递给后台任务对象。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task() 接收以下参数

  • 一个将在后台运行的任务函数(write_notification)。
  • 任何应按顺序传递给任务函数的参数序列(email)。
  • 任何应传递给任务函数的关键字参数(message="some notification")。

依赖注入

使用 BackgroundTasks 也适用于依赖注入系统,您可以在多个级别声明一个类型为 BackgroundTasks 的参数:在路径操作函数中,在依赖项(dependable)中,在子依赖项中,依此类推。

FastAPI 知道在每种情况下该做什么以及如何重用同一对象,因此所有后台任务都会合并在一起,并在之后在后台运行。

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
🤓 其他版本和变体
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

提示

如果可能,请优先使用 Annotated 版本。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

提示

如果可能,请优先使用 Annotated 版本。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

在此示例中,消息将在响应发送之后写入 log.txt 文件。

如果请求中有一个查询参数,它将在后台任务中写入日志。

然后,在路径操作函数中生成的另一个后台任务将使用 email 路径参数写入消息。

技术细节

BackgroundTasks 类直接来自 starlette.background

它被直接导入/包含到 FastAPI 中,以便您可以从 fastapi 导入它,并避免意外地从 starlette.background 导入同名的 BackgroundTask(结尾没有 s)。

仅使用 BackgroundTasks(而不是 BackgroundTask),您就可以将其用作路径操作函数的参数,并让 FastAPI 为您处理其余的事情,就像直接使用 Request 对象一样。

仍然可以在 FastAPI 中单独使用 BackgroundTask,但您必须在代码中创建该对象,并返回一个包含它的 Starlette Response

您可以在 Starlette 的后台任务官方文档中找到更多详细信息。

注意事项

如果您需要执行繁重的后台计算,并且不一定需要由同一个进程运行(例如,您不需要共享内存、变量等),那么使用 Celery 等更强大的工具可能会让您受益。

它们通常需要更复杂的配置、消息/作业队列管理器,如 RabbitMQ 或 Redis,但它们允许您在多个进程中运行后台任务,尤其是在多台服务器上。

但是,如果您需要访问同一个 FastAPI 应用程序中的变量和对象,或者需要执行小型后台任务(如发送电子邮件通知),您可以简单地使用 BackgroundTasks

总结

路径操作函数和依赖项中导入并使用 BackgroundTasks 来添加后台任务。