WebSockets
定义 WebSockets 时,通常会声明一个 WebSocket
类型的参数,通过它可以从客户端读取数据并向客户端发送数据。
它由 Starlette 直接提供,但你可以从 fastapi
导入它。
from fastapi import WebSocket
提示
当你想定义与 HTTP 和 WebSockets 都兼容的依赖时,你可以定义一个参数来接收 HTTPConnection
,而不是 Request
或 WebSocket
。
fastapi.WebSocket
WebSocket(scope, receive, send)
继承:HTTPConnection
源代码位于 starlette/websockets.py
| def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
super().__init__(scope)
assert scope["type"] == "websocket"
self._receive = receive
self._send = send
self.client_state = WebSocketState.CONNECTING
self.application_state = WebSocketState.CONNECTING
|
url_for
url_for(name, /, **path_params)
源代码在 starlette/requests.py
| def url_for(self, name: str, /, **path_params: typing.Any) -> URL:
url_path_provider: Router | Starlette | None = self.scope.get("router") or self.scope.get("app")
if url_path_provider is None:
raise RuntimeError("The `url_for` method can only be used inside a Starlette application or with a router.")
url_path = url_path_provider.url_path_for(name, **path_params)
return url_path.make_absolute_url(base_url=self.base_url)
|
receive 异步
接收 ASGI WebSocket 消息,确保状态转换有效。
源代码位于 starlette/websockets.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | async def receive(self) -> Message:
"""
Receive ASGI websocket messages, ensuring valid state transitions.
"""
if self.client_state == WebSocketState.CONNECTING:
message = await self._receive()
message_type = message["type"]
if message_type != "websocket.connect":
raise RuntimeError(f'Expected ASGI message "websocket.connect", but got {message_type!r}')
self.client_state = WebSocketState.CONNECTED
return message
elif self.client_state == WebSocketState.CONNECTED:
message = await self._receive()
message_type = message["type"]
if message_type not in {"websocket.receive", "websocket.disconnect"}:
raise RuntimeError(
f'Expected ASGI message "websocket.receive" or "websocket.disconnect", but got {message_type!r}'
)
if message_type == "websocket.disconnect":
self.client_state = WebSocketState.DISCONNECTED
return message
else:
raise RuntimeError('Cannot call "receive" once a disconnect message has been received.')
|
send 异步
发送 ASGI WebSocket 消息,确保状态转换有效。
源代码位于 starlette/websockets.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 | async def send(self, message: Message) -> None:
"""
Send ASGI websocket messages, ensuring valid state transitions.
"""
if self.application_state == WebSocketState.CONNECTING:
message_type = message["type"]
if message_type not in {"websocket.accept", "websocket.close", "websocket.http.response.start"}:
raise RuntimeError(
'Expected ASGI message "websocket.accept", "websocket.close" or "websocket.http.response.start", '
f"but got {message_type!r}"
)
if message_type == "websocket.close":
self.application_state = WebSocketState.DISCONNECTED
elif message_type == "websocket.http.response.start":
self.application_state = WebSocketState.RESPONSE
else:
self.application_state = WebSocketState.CONNECTED
await self._send(message)
elif self.application_state == WebSocketState.CONNECTED:
message_type = message["type"]
if message_type not in {"websocket.send", "websocket.close"}:
raise RuntimeError(
f'Expected ASGI message "websocket.send" or "websocket.close", but got {message_type!r}'
)
if message_type == "websocket.close":
self.application_state = WebSocketState.DISCONNECTED
try:
await self._send(message)
except OSError:
self.application_state = WebSocketState.DISCONNECTED
raise WebSocketDisconnect(code=1006)
elif self.application_state == WebSocketState.RESPONSE:
message_type = message["type"]
if message_type != "websocket.http.response.body":
raise RuntimeError(f'Expected ASGI message "websocket.http.response.body", but got {message_type!r}')
if not message.get("more_body", False):
self.application_state = WebSocketState.DISCONNECTED
await self._send(message)
else:
raise RuntimeError('Cannot call "send" once a close message has been sent.')
|
accept 异步
accept(subprotocol=None, headers=None)
源代码位于 starlette/websockets.py
99
100
101
102
103
104
105
106
107
108
109 | async def accept(
self,
subprotocol: str | None = None,
headers: typing.Iterable[tuple[bytes, bytes]] | None = None,
) -> None:
headers = headers or []
if self.client_state == WebSocketState.CONNECTING: # pragma: no branch
# If we haven't yet seen the 'connect' message, then wait for it first.
await self.receive()
await self.send({"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers})
|
receive_text 异步
源代码位于 starlette/websockets.py
| async def receive_text(self) -> str:
if self.application_state != WebSocketState.CONNECTED:
raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
message = await self.receive()
self._raise_on_disconnect(message)
return typing.cast(str, message["text"])
|
receive_bytes 异步
源代码位于 starlette/websockets.py
| async def receive_bytes(self) -> bytes:
if self.application_state != WebSocketState.CONNECTED:
raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
message = await self.receive()
self._raise_on_disconnect(message)
return typing.cast(bytes, message["bytes"])
|
receive_json 异步
receive_json(mode='text')
源代码位于 starlette/websockets.py
129
130
131
132
133
134
135
136
137
138
139
140
141 | async def receive_json(self, mode: str = "text") -> typing.Any:
if mode not in {"text", "binary"}:
raise RuntimeError('The "mode" argument should be "text" or "binary".')
if self.application_state != WebSocketState.CONNECTED:
raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
message = await self.receive()
self._raise_on_disconnect(message)
if mode == "text":
text = message["text"]
else:
text = message["bytes"].decode("utf-8")
return json.loads(text)
|
iter_text 异步
源代码位于 starlette/websockets.py
| async def iter_text(self) -> typing.AsyncIterator[str]:
try:
while True:
yield await self.receive_text()
except WebSocketDisconnect:
pass
|
iter_bytes 异步
源代码位于 starlette/websockets.py
| async def iter_bytes(self) -> typing.AsyncIterator[bytes]:
try:
while True:
yield await self.receive_bytes()
except WebSocketDisconnect:
pass
|
iter_json 异步
源代码位于 starlette/websockets.py
| async def iter_json(self) -> typing.AsyncIterator[typing.Any]:
try:
while True:
yield await self.receive_json()
except WebSocketDisconnect:
pass
|
send_text 异步
源代码位于 starlette/websockets.py
| async def send_text(self, data: str) -> None:
await self.send({"type": "websocket.send", "text": data})
|
send_bytes 异步
源代码位于 starlette/websockets.py
| async def send_bytes(self, data: bytes) -> None:
await self.send({"type": "websocket.send", "bytes": data})
|
send_json 异步
send_json(data, mode='text')
源代码位于 starlette/websockets.py
170
171
172
173
174
175
176
177 | async def send_json(self, data: typing.Any, mode: str = "text") -> None:
if mode not in {"text", "binary"}:
raise RuntimeError('The "mode" argument should be "text" or "binary".')
text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
if mode == "text":
await self.send({"type": "websocket.send", "text": text})
else:
await self.send({"type": "websocket.send", "bytes": text.encode("utf-8")})
|
close 异步
close(code=1000, reason=None)
源代码位于 starlette/websockets.py
| async def close(self, code: int = 1000, reason: str | None = None) -> None:
await self.send({"type": "websocket.close", "code": code, "reason": reason or ""})
|
当客户端断开连接时,会引发 WebSocketDisconnect
异常,你可以捕获它。
你可以直接从 fastapi
导入它。
from fastapi import WebSocketDisconnect
fastapi.WebSocketDisconnect
WebSocketDisconnect(code=1000, reason=None)
继承:异常
源代码位于 starlette/websockets.py
| def __init__(self, code: int = 1000, reason: str | None = None) -> None:
self.code = code
self.reason = reason or ""
|
WebSockets - 附加类
用于处理 WebSockets 的附加类。
它由 Starlette 直接提供,但你可以从 fastapi
导入它。
from fastapi.websockets import WebSocketDisconnect, WebSocketState
fastapi.websockets.WebSocketDisconnect
WebSocketDisconnect(code=1000, reason=None)
继承:异常
源代码位于 starlette/websockets.py
| def __init__(self, code: int = 1000, reason: str | None = None) -> None:
self.code = code
self.reason = reason or ""
|
fastapi.websockets.WebSocketState