使用 Couchbase 的 NoSQL(分布式/大数据)数据库(已弃用)¶
"已弃用"
本教程已弃用,将在未来版本中删除。
FastAPI 也可以与任何 NoSQL 集成。
这里我们将看到使用 Couchbase 的示例,这是一个基于 文档 的 NoSQL 数据库。
您可以将其调整为任何其他 NoSQL 数据库,例如
- MongoDB
- Cassandra
- CouchDB
- ArangoDB
- ElasticSearch 等。
提示
有一个使用 FastAPI 和 Couchbase 的官方项目生成器,全部基于 Docker,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-couchbase
导入 Couchbase 组件¶
现在,请不要关注其他内容,只关注导入
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
定义一个用作“文档类型”的常量¶
我们稍后会将其用作文档中的固定字段 type
。
Couchbase 不要求这样做,但这是一种良好的做法,在后续操作中会对您有所帮助。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
添加一个获取 Bucket
的函数¶
在 Couchbase 中,一个 bucket 是一组文档,可以是不同类型的。
它们通常与同一个应用程序相关联。
在关系型数据库世界中的类比是“数据库”(一个特定的数据库,而不是数据库服务器)。
在 MongoDB 中的类比是“集合”。
在代码中,Bucket
代表与数据库进行通信的主要入口点。
此实用函数将
- 连接到 Couchbase 集群(可能是一台机器)。
- 设置超时时间的默认值。
- 在集群中进行身份验证。
- 获取
Bucket
实例。- 设置超时时间的默认值。
- 将其返回。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
创建 Pydantic 模型¶
由于 Couchbase 的“文档”实际上只是“JSON 对象”,因此我们可以使用 Pydantic 对其建模。
User
模型¶
首先,让我们创建一个 User
模型
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
我们将在我们的路径操作函数中使用这个模型,所以,我们不会在其中包含hashed_password
。
UserInDB
模型¶
现在,让我们创建一个UserInDB
模型。
这将包含实际存储在数据库中的数据。
我们不会将其创建为 Pydantic 的BaseModel
的子类,而是作为我们自己的User
的子类,因为它将包含User
中的所有属性以及另外几个属性。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
注意
请注意,我们有一个hashed_password
和一个type
字段,它们将存储在数据库中。
但它不是通用User
模型(将在路径操作中返回的模型)的一部分。
获取用户¶
现在创建一个函数,它将
- 获取一个用户名。
- 从它生成一个文档 ID。
- 获取具有该 ID 的文档。
- 将文档的内容放入
UserInDB
模型中。
通过创建一个仅用于从username
(或任何其他参数)获取用户(独立于你的路径操作函数)的函数,你可以更轻松地将其在多个部分重复使用,并为其添加单元测试
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
f-字符串¶
如果您不熟悉f"userprofile::{username}"
,它是一个 Python "f-字符串"。
任何放在 f-字符串中{}
内的变量都将在字符串中被扩展/注入。
dict
解包¶
如果您不熟悉UserInDB(**result.value)
,它使用了dict
“解包”.
它将获取result.value
处的dict
,并获取其每个键和值,并将它们作为键值对以关键字参数的形式传递给UserInDB
。
所以,如果dict
包含
{
"username": "johndoe",
"hashed_password": "some_hash",
}
它将被传递给UserInDB
作为
UserInDB(username="johndoe", hashed_password="some_hash")
创建你的FastAPI代码¶
创建FastAPI
应用¶
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
创建路径操作函数¶
由于我们的代码正在调用 Couchbase 并且我们没有使用实验性的 Python await
支持,我们应该使用正常的def
而不是async def
声明我们的函数。
此外,Couchbase 建议不要在一个Bucket
对象中使用多个"线程",所以,我们可以直接获取桶并将其传递给我们的实用程序函数。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
回顾¶
你可以集成任何第三方 NoSQL 数据库,只需使用它们的标准包。
同样适用于任何其他外部工具、系统或 API。