跳至内容

后台任务

您可以定义在返回响应后运行的后台任务。

这对于需要在请求后发生的但客户端不必在收到响应之前等待操作完成的操作很有用。

例如,这包括

  • 执行操作后发送的电子邮件通知
    • 由于连接到电子邮件服务器并发送电子邮件往往“缓慢”(几秒钟),因此您可以立即返回响应并在后台发送电子邮件通知。
  • 处理数据
    • 例如,假设您收到一个必须经过缓慢处理的文件,您可以返回“已接受”(HTTP 202)的响应并在后台处理该文件。

使用 BackgroundTasks

首先,导入 BackgroundTasks 并在您的路径操作函数中定义一个参数,其类型声明为 BackgroundTasks

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPI 将为您创建类型为 BackgroundTasks 的对象并将其作为该参数传递。

创建任务函数

创建一个作为后台任务运行的函数。

它只是一个可以接收参数的标准函数。

它可以是 async def 或普通的 def 函数,FastAPI 将知道如何正确处理它。

在本例中,任务函数将写入文件(模拟发送电子邮件)。

并且由于写入操作不使用 asyncawait,因此我们使用普通的 def 定义函数

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

添加后台任务

在您的路径操作函数中,使用 .add_task() 方法将您的任务函数传递给后台任务对象

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task() 接收以下参数

  • 要在后台运行的任务函数(write_notification)。
  • 应按顺序传递给任务函数的任何参数序列(email)。
  • 应传递给任务函数的任何关键字参数(message="some notification")。

依赖注入

使用 BackgroundTasks 也适用于依赖注入系统,您可以在多个级别声明类型为 BackgroundTasks 的参数:在路径操作函数中,在依赖项(可依赖项)中,在子依赖项中,等等。

FastAPI 知道在每种情况下该做什么以及如何重用同一个对象,以便所有后台任务都合并在一起并在之后在后台运行

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

提示

如果可能,最好使用 Annotated 版本。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

提示

如果可能,最好使用 Annotated 版本。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

在此示例中,消息将在发送响应写入 log.txt 文件。

如果请求中存在查询,它将在后台任务中写入日志。

然后,在路径操作函数中生成的另一个后台任务将使用 email 路径参数写入消息。

技术细节

BackgroundTasks 直接来自 starlette.background

它被直接导入/包含到 FastAPI 中,以便您可以从fastapi中导入它,并避免意外地从starlette.background中导入备用的BackgroundTask(末尾没有“s”)。

通过仅使用BackgroundTasks(而不是BackgroundTask),就可以将其用作路径操作函数参数,并让**FastAPI**为您处理其余部分,就像直接使用Request对象一样。

仍然可以在 FastAPI 中单独使用BackgroundTask,但是您必须在代码中创建对象并返回包含它的 Starlette Response

您可以在Starlette 的背景任务官方文档中查看更多详细信息。

注意事项

如果您需要执行繁重的后台计算,并且不一定需要由同一个进程运行(例如,您不需要共享内存、变量等),那么您可能受益于使用其他更大的工具,例如Celery

它们往往需要更复杂的配置,一个消息/作业队列管理器,如 RabbitMQ 或 Redis,但它们允许您在多个进程中运行后台任务,尤其是在多个服务器中。

要查看示例,请查看项目生成器,它们都已包含已配置的 Celery。

但是,如果您需要访问来自同一个**FastAPI**应用程序的变量和对象,或者您需要执行小的后台任务(例如发送电子邮件通知),您可以简单地使用BackgroundTasks

总结

导入并使用BackgroundTasks以及路径操作函数和依赖项中的参数来添加后台任务。