后台任务¶
您可以定义后台任务,使其在返回响应后运行。
这对于需要在请求完成后执行,但客户端不必等待操作完成即可接收响应的操作很有用。
例如,这包括
- 执行操作后发送的电子邮件通知
- 由于连接到邮件服务器并发送邮件可能很“慢”(需要几秒钟),因此您可以立即返回响应,并在后台发送电子邮件通知。
- 处理数据
- 例如,假设您收到一个文件,该文件必须经过一个缓慢的处理过程,您可以返回一个“已接受”(HTTP 202)的响应,并在后台处理该文件。
使用 BackgroundTasks¶
首先,导入 BackgroundTasks 并在您的路径操作函数中定义一个参数,其类型声明为 BackgroundTasks
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
FastAPI 将为您创建 BackgroundTasks 类型的对象,并将其作为该参数传递。
创建任务函数¶
创建一个函数作为后台任务运行。
它只是一个可以接收参数的标准函数。
它可以是 async def 或普通 def 函数,FastAPI 将知道如何正确处理它。
在这种情况下,任务函数将写入文件(模拟发送电子邮件)。
由于写入操作不使用 async 和 await,因此我们使用普通 def 定义函数
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
添加后台任务¶
在您的路径操作函数内部,通过 .add_task() 方法将您的任务函数传递给后台任务对象。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
.add_task() 接收以下参数
- 一个将在后台运行的任务函数(
write_notification)。 - 任何应按顺序传递给任务函数的参数序列(
email)。 - 任何应传递给任务函数的关键字参数(
message="some notification")。
依赖注入¶
使用 BackgroundTasks 也适用于依赖注入系统,您可以在多个级别声明一个类型为 BackgroundTasks 的参数:在路径操作函数中,在依赖项(dependable)中,在子依赖项中,依此类推。
FastAPI 知道在每种情况下该做什么以及如何重用同一对象,因此所有后台任务都会合并在一起,并在之后在后台运行。
from typing import Annotated
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
🤓 其他版本和变体
from typing import Annotated, Union
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
提示
如果可能,请优先使用 Annotated 版本。
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
提示
如果可能,请优先使用 Annotated 版本。
from typing import Union
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
在此示例中,消息将在响应发送之后写入 log.txt 文件。
如果请求中有一个查询参数,它将在后台任务中写入日志。
然后,在路径操作函数中生成的另一个后台任务将使用 email 路径参数写入消息。
技术细节¶
BackgroundTasks 类直接来自 starlette.background。
它被直接导入/包含到 FastAPI 中,以便您可以从 fastapi 导入它,并避免意外地从 starlette.background 导入同名的 BackgroundTask(结尾没有 s)。
仅使用 BackgroundTasks(而不是 BackgroundTask),您就可以将其用作路径操作函数的参数,并让 FastAPI 为您处理其余的事情,就像直接使用 Request 对象一样。
仍然可以在 FastAPI 中单独使用 BackgroundTask,但您必须在代码中创建该对象,并返回一个包含它的 Starlette Response。
您可以在 Starlette 的后台任务官方文档中找到更多详细信息。
注意事项¶
如果您需要执行繁重的后台计算,并且不一定需要由同一个进程运行(例如,您不需要共享内存、变量等),那么使用 Celery 等更强大的工具可能会让您受益。
它们通常需要更复杂的配置、消息/作业队列管理器,如 RabbitMQ 或 Redis,但它们允许您在多个进程中运行后台任务,尤其是在多台服务器上。
但是,如果您需要访问同一个 FastAPI 应用程序中的变量和对象,或者需要执行小型后台任务(如发送电子邮件通知),您可以简单地使用 BackgroundTasks。
总结¶
在路径操作函数和依赖项中导入并使用 BackgroundTasks 来添加后台任务。