跳到内容

后台任务

你可以定义后台任务,让它们在返回响应 之后 运行。

这对于那些需要在请求之后发生,但客户端不必等待操作完成即可接收响应的任务非常有用。

例如,这包括:

  • 执行操作后发送电子邮件通知
    • 由于连接到电子邮件服务器并发送电子邮件通常很“慢”(几秒钟),你可以立即返回响应,并在后台发送电子邮件通知。
  • 处理数据
    • 例如,假设你收到一个必须经过缓慢处理的文件,你可以返回一个“已接受”响应(HTTP 202),并在后台处理该文件。

使用 BackgroundTasks

首先,导入 BackgroundTasks 并在你的 路径操作函数 中定义一个类型声明为 BackgroundTasks 的参数。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPI 会为你创建 BackgroundTasks 类型的对象并将其作为该参数传递。

创建任务函数

创建一个函数作为后台任务运行。

它只是一个可以接收参数的标准函数。

它可以是 async def 或普通的 def 函数,FastAPI 将知道如何正确处理它。

在这种情况下,任务函数将写入文件(模拟发送电子邮件)。

由于写入操作不使用 asyncawait,我们使用普通的 def 定义函数。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

添加后台任务

在你的 路径操作函数 内部,使用 .add_task() 方法将你的任务函数传递给 后台任务 对象。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task() 接收以下参数:

  • 一个在后台运行的任务函数 (write_notification)。
  • 任何应按顺序传递给任务函数的参数序列 (email)。
  • 任何应传递给任务函数的关键字参数 (message="some notification")。

依赖注入

使用 BackgroundTasks 也适用于依赖注入系统,你可以在多个层面声明 BackgroundTasks 类型的参数:在 路径操作函数 中、在依赖项 (dependable) 中、在子依赖项中等。

FastAPI 知道在每种情况下该怎么做,以及如何复用同一个对象,以便所有后台任务合并在一起并在之后于后台运行。

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
🤓 其他版本和变体
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

提示

如果可能,请优先使用 Annotated 版本。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

提示

如果可能,请优先使用 Annotated 版本。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

在这个例子中,消息将在响应发送 之后 写入 log.txt 文件。

如果请求中包含查询,它将在后台任务中写入日志。

然后,在 路径操作函数 生成的另一个后台任务将使用 email 路径参数写入一条消息。

技术细节

BackgroundTasks 类直接来自 starlette.background

它被直接导入/包含在 FastAPI 中,这样你就可以从 fastapi 导入它,并避免意外地从 starlette.background 导入替代的 BackgroundTask(末尾没有 s)。

通过只使用 BackgroundTasks(而不是 BackgroundTask),就可以将其用作 路径操作函数 的参数,并让 FastAPI 为你处理其余部分,就像直接使用 Request 对象一样。

在 FastAPI 中仍然可以单独使用 BackgroundTask,但你必须在代码中创建对象并返回包含它的 Starlette Response

你可以在 Starlette 官方的后台任务文档 中查看更多详情。

注意事项

如果你需要执行繁重的后台计算,并且不一定需要它由同一进程运行(例如,你不需要共享内存、变量等),那么你可能会受益于使用其他更大型的工具,如 Celery

它们通常需要更复杂的配置、一个消息/任务队列管理器(如 RabbitMQ 或 Redis),但它们允许你在多个进程,特别是多个服务器上运行后台任务。

但是,如果你需要访问来自同一个 FastAPI 应用程序的变量和对象,或者你需要执行小的后台任务(如发送电子邮件通知),你可以直接使用 BackgroundTasks

总结

导入并在 路径操作函数 和依赖项中使用带有参数的 BackgroundTasks 来添加后台任务。