跳到内容

流式传输 JSON Lines

如果你有一连串的数据想要以“”的形式发送,可以使用 JSON Lines 来实现。

信息

在 FastAPI 0.134.0 版本中添加。

什么是流?

流式传输”数据意味着你的应用将开始向客户端发送数据项,而无需等待所有数据项全部准备就绪。

因此,它会发送第一项,客户端在接收并开始处理它的同时,你可能还在生产下一项。

sequenceDiagram
    participant App
    participant Client

    App->>App: Produce Item 1
    App->>Client: Send Item 1
    App->>App: Produce Item 2
    Client->>Client: Process Item 1
    App->>Client: Send Item 2
    App->>App: Produce Item 3
    Client->>Client: Process Item 2
    App->>Client: Send Item 3
    Client->>Client: Process Item 3
    Note over App: Keeps producing...
    Note over Client: Keeps consuming...

它甚至可以是一个无限流,你可以持续不断地发送数据。

JSON Lines

在这种情况下,通常会发送“JSON Lines”,这是一种每行发送一个 JSON 对象的格式。

响应的内容类型 (Content-Type) 应为 application/jsonl(而不是 application/json),主体大致如下:

{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}

它非常类似于 JSON 数组(相当于 Python 列表),但它不是被 [] 包裹且项之间用 , 分隔,而是每行一个 JSON 对象,它们由换行符分隔。

信息

重点在于你的应用能够依次生成每一行,而客户端则同时消费之前的行。

技术细节

由于每个 JSON 对象都由换行符分隔,它们的内容中不能包含字面换行符,但可以包含转义后的换行符 (\n),这是 JSON 标准的一部分。

但通常你无需担心这一点,它是自动处理的,请继续阅读。 🤓

应用场景

你可以使用这种方式从 AI 大模型 (LLM) 服务、日志遥测数据或任何可以结构化为 JSON 项的其他类型数据中流式传输数据。

提示

如果你想流式传输二进制数据(例如视频或音频),请查看高级指南:流式传输数据 (Stream Data)

使用 FastAPI 流式传输 JSON Lines

要使用 FastAPI 流式传输 JSON Lines,你可以在路径操作函数中使用 yield 来依次生成每一项,而不是使用 return

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 完整文件预览
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

如果你想要返回的每个 JSON 项都是 Item 类型(Pydantic 模型)且函数是异步的,你可以将返回类型声明为 AsyncIterable[Item]

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 完整文件预览
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

如果你声明了返回类型,FastAPI 将使用它来验证数据、在 OpenAPI 中记录它、过滤它,并使用 Pydantic 进行序列化

提示

由于 Pydantic 会在 Rust 端进行序列化,因此你会获得比不声明返回类型高得多的性能

非异步 路径操作函数

你也可以使用普通的 def 函数(不带 async),并以同样的方式使用 yield

FastAPI 会确保它正确运行,从而不会阻塞事件循环。

如果该函数不是异步的,则正确的返回类型应为 Iterable[Item]

# Code above omitted 👆

@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 完整文件预览
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

无返回类型

你也可以省略返回类型。此时 FastAPI 将使用 jsonable_encoder 将数据转换为可以序列化为 JSON 的格式,然后将其作为 JSON Lines 发送。

# Code above omitted 👆

@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 完整文件预览
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

服务器发送事件 (SSE)

FastAPI 对服务器发送事件 (SSE) 也有内置的一等支持,它们非常相似,但多了一些额外的细节。你可以在下一章中了解它们:服务器发送事件 (SSE)。 🤓