fromfastapiimportFastAPI,HTTPExceptionapp=FastAPI()items={"foo":"The Foo Wrestlers"}@app.get("/items/{item_id}")asyncdefread_item(item_id:str):ifitem_idnotinitems:raiseHTTPException(status_code=404,detail="Item not found")return{"item":items[item_id]}
def__init__(self,status_code:Annotated[int,Doc(""" HTTP status code to send to the client. """),],detail:Annotated[Any,Doc(""" Any data to be sent to the client in the `detail` key of the JSON response. """),]=None,headers:Annotated[Optional[Dict[str,str]],Doc(""" Any headers to send to the client in the response. """),]=None,)->None:super().__init__(status_code=status_code,detail=detail,headers=headers)
fromtypingimportAnnotatedfromfastapiimport(Cookie,FastAPI,WebSocket,WebSocketException,status,)app=FastAPI()@app.websocket("/items/{item_id}/ws")asyncdefwebsocket_endpoint(*,websocket:WebSocket,session:Annotated[str|None,Cookie()]=None,item_id:str,):ifsessionisNone:raiseWebSocketException(code=status.WS_1008_POLICY_VIOLATION)awaitwebsocket.accept()whileTrue:data=awaitwebsocket.receive_text()awaitwebsocket.send_text(f"Session cookie is: {session}")awaitwebsocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
def__init__(self,code:Annotated[int,Doc(""" A closing code from the [valid codes defined in the specification](https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1). """),],reason:Annotated[Union[str,None],Doc(""" The reason to close the WebSocket connection. It is UTF-8-encoded data. The interpretation of the reason is up to the application, it is not specified by the WebSocket specification. It could contain text that could be human-readable or interpretable by the client code, etc. """),]=None,)->None:super().__init__(code=code,reason=reason)