跳至内容

WebSockets

在定义 WebSockets 时,通常声明一个类型为 WebSocket 的参数,并使用它从客户端读取数据并向其发送数据。

它由 Starlette 直接提供,但您可以从 fastapi 中导入它。

from fastapi import WebSocket

提示

当您想要定义应与 HTTP 和 WebSockets 都兼容的依赖项时,您可以定义一个参数,该参数采用 HTTPConnection 而不是 RequestWebSocket

fastapi.WebSocket

WebSocket(scope, receive, send)

基类:HTTPConnection

参数 描述
scope

类型: Scope

receive

类型: Receive

send

类型: Send

源代码位于 starlette/websockets.py

26
27
28
29
30
31
32
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
    super().__init__(scope)
    assert scope["type"] == "websocket"
    self._receive = receive
    self._send = send
    self.client_state = WebSocketState.CONNECTING
    self.application_state = WebSocketState.CONNECTING

scope 实例属性

scope = scope

app 属性

app

url 属性

url

base_url 属性

base_url

headers 属性

headers

query_params 属性

query_params

path_params 属性

path_params

cookies 属性

cookies

client 属性

client

state 属性

state

client_state 实例属性

client_state = CONNECTING

application_state 实例属性

application_state = CONNECTING

url_for

url_for(name, /, **path_params)
参数 描述
名称

类型: str

**path_params**

类型: Any 默认值: {}

源代码位于 starlette/requests.py
177
178
179
180
def url_for(self, name: str, /, **path_params: typing.Any) -> URL:
    router: Router = self.scope["router"]
    url_path = router.url_path_for(name, **path_params)
    return url_path.make_absolute_url(base_url=self.base_url)

receive 异步

receive()

接收 ASGI WebSocket 消息,确保有效的状态转换。

源代码位于 starlette/websockets.py

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def receive(self) -> Message:
    """
    Receive ASGI websocket messages, ensuring valid state transitions.
    """
    if self.client_state == WebSocketState.CONNECTING:
        message = await self._receive()
        message_type = message["type"]
        if message_type != "websocket.connect":
            raise RuntimeError(f'Expected ASGI message "websocket.connect", but got {message_type!r}')
        self.client_state = WebSocketState.CONNECTED
        return message
    elif self.client_state == WebSocketState.CONNECTED:
        message = await self._receive()
        message_type = message["type"]
        if message_type not in {"websocket.receive", "websocket.disconnect"}:
            raise RuntimeError(
                f'Expected ASGI message "websocket.receive" or "websocket.disconnect", but got {message_type!r}'
            )
        if message_type == "websocket.disconnect":
            self.client_state = WebSocketState.DISCONNECTED
        return message
    else:
        raise RuntimeError('Cannot call "receive" once a disconnect message has been received.')

send 异步

send(message)

发送 ASGI WebSocket 消息,确保有效的状态转换。

参数 描述
消息

类型: Message

源代码位于 starlette/websockets.py

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def send(self, message: Message) -> None:
    """
    Send ASGI websocket messages, ensuring valid state transitions.
    """
    if self.application_state == WebSocketState.CONNECTING:
        message_type = message["type"]
        if message_type not in {"websocket.accept", "websocket.close", "websocket.http.response.start"}:
            raise RuntimeError(
                'Expected ASGI message "websocket.accept", "websocket.close" or "websocket.http.response.start", '
                f"but got {message_type!r}"
            )
        if message_type == "websocket.close":
            self.application_state = WebSocketState.DISCONNECTED
        elif message_type == "websocket.http.response.start":
            self.application_state = WebSocketState.RESPONSE
        else:
            self.application_state = WebSocketState.CONNECTED
        await self._send(message)
    elif self.application_state == WebSocketState.CONNECTED:
        message_type = message["type"]
        if message_type not in {"websocket.send", "websocket.close"}:
            raise RuntimeError(
                f'Expected ASGI message "websocket.send" or "websocket.close", but got {message_type!r}'
            )
        if message_type == "websocket.close":
            self.application_state = WebSocketState.DISCONNECTED
        try:
            await self._send(message)
        except OSError:
            self.application_state = WebSocketState.DISCONNECTED
            raise WebSocketDisconnect(code=1006)
    elif self.application_state == WebSocketState.RESPONSE:
        message_type = message["type"]
        if message_type != "websocket.http.response.body":
            raise RuntimeError(f'Expected ASGI message "websocket.http.response.body", but got {message_type!r}')
        if not message.get("more_body", False):
            self.application_state = WebSocketState.DISCONNECTED
        await self._send(message)
    else:
        raise RuntimeError('Cannot call "send" once a close message has been sent.')

accept 异步

accept(subprotocol=None, headers=None)
参数 描述
子协议

类型: str | None 默认值: None

头部

类型: Iterable[tuple[bytes, bytes]] | None 默认值: None

源代码位于 starlette/websockets.py

 99
100
101
102
103
104
105
106
107
108
109
async def accept(
    self,
    subprotocol: str | None = None,
    headers: typing.Iterable[tuple[bytes, bytes]] | None = None,
) -> None:
    headers = headers or []

    if self.client_state == WebSocketState.CONNECTING:
        # If we haven't yet seen the 'connect' message, then wait for it first.
        await self.receive()
    await self.send({"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers})

receive_text 异步

receive_text()
源代码位于 starlette/websockets.py

115
116
117
118
119
120
async def receive_text(self) -> str:
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)
    return typing.cast(str, message["text"])

receive_bytes 异步

receive_bytes()
源代码位于 starlette/websockets.py

122
123
124
125
126
127
async def receive_bytes(self) -> bytes:
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)
    return typing.cast(bytes, message["bytes"])

receive_json 异步

receive_json(mode='text')
参数 描述
模式

类型: str 默认值: 'text'

源代码位于 starlette/websockets.py

129
130
131
132
133
134
135
136
137
138
139
140
141
async def receive_json(self, mode: str = "text") -> typing.Any:
    if mode not in {"text", "binary"}:
        raise RuntimeError('The "mode" argument should be "text" or "binary".')
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)

    if mode == "text":
        text = message["text"]
    else:
        text = message["bytes"].decode("utf-8")
    return json.loads(text)

iter_text 异步

iter_text()
源代码位于 starlette/websockets.py

143
144
145
146
147
148
async def iter_text(self) -> typing.AsyncIterator[str]:
    try:
        while True:
            yield await self.receive_text()
    except WebSocketDisconnect:
        pass

iter_bytes 异步

iter_bytes()
源代码位于 starlette/websockets.py

150
151
152
153
154
155
async def iter_bytes(self) -> typing.AsyncIterator[bytes]:
    try:
        while True:
            yield await self.receive_bytes()
    except WebSocketDisconnect:
        pass

iter_json 异步

iter_json()
源代码位于 starlette/websockets.py

157
158
159
160
161
162
async def iter_json(self) -> typing.AsyncIterator[typing.Any]:
    try:
        while True:
            yield await self.receive_json()
    except WebSocketDisconnect:
        pass

send_text 异步

send_text(data)
参数 描述
数据

类型: str

源代码位于 starlette/websockets.py

164
165
async def send_text(self, data: str) -> None:
    await self.send({"type": "websocket.send", "text": data})

send_bytes 异步

send_bytes(data)
参数 描述
数据

类型: bytes

源代码位于 starlette/websockets.py

167
168
async def send_bytes(self, data: bytes) -> None:
    await self.send({"type": "websocket.send", "bytes": data})

send_json 异步

send_json(data, mode='text')
参数 描述
数据

类型: Any

模式

类型: str 默认值: 'text'

源代码位于 starlette/websockets.py

170
171
172
173
174
175
176
177
async def send_json(self, data: typing.Any, mode: str = "text") -> None:
    if mode not in {"text", "binary"}:
        raise RuntimeError('The "mode" argument should be "text" or "binary".')
    text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
    if mode == "text":
        await self.send({"type": "websocket.send", "text": text})
    else:
        await self.send({"type": "websocket.send", "bytes": text.encode("utf-8")})

close 异步

close(code=1000, reason=None)
参数 描述
代码

类型: int 默认值: 1000

原因

类型: str | None 默认值: None

源代码位于 starlette/websockets.py

179
180
async def close(self, code: int = 1000, reason: str | None = None) -> None:
    await self.send({"type": "websocket.close", "code": code, "reason": reason or ""})

当客户端断开连接时,会引发 WebSocketDisconnect 异常,你可以捕获它。

你可以直接从 fastapi 中导入它。

from fastapi import WebSocketDisconnect

fastapi.WebSocketDisconnect

WebSocketDisconnect(code=1000, reason=None)

基类: Exception

参数 描述
代码

类型: int 默认值: 1000

原因

类型: str | None 默认值: None

源代码位于 starlette/websockets.py

20
21
22
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
    self.code = code
    self.reason = reason or ""

code 实例属性

code = code

reason 实例属性

reason = reason or ''

WebSockets - 其他类

用于处理 WebSockets 的其他类。

由 Starlette 直接提供,但你可以从 fastapi 中导入它。

from fastapi.websockets import WebSocketDisconnect, WebSocketState

fastapi.websockets.WebSocketDisconnect

WebSocketDisconnect(code=1000, reason=None)

基类: Exception

参数 描述
代码

类型: int 默认值: 1000

原因

类型: str | None 默认值: None

源代码位于 starlette/websockets.py

20
21
22
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
    self.code = code
    self.reason = reason or ""

code 实例属性

code = code

reason 实例属性

reason = reason or ''

fastapi.websockets.WebSocketState

基类: Enum

CONNECTING 类属性 实例属性

CONNECTING = 0

CONNECTED 类属性 实例属性

CONNECTED = 1

DISCONNECTED 类属性 实例属性

DISCONNECTED = 2

RESPONSE 类属性 实例属性

RESPONSE = 3