現(xiàn)在讓我們接著上一章繼續(xù)開發(fā),并添加缺少的部分以實現(xiàn)一個完整的安全性流程。
我們將使用 FastAPI 的安全性實用工具來獲取 username 和 password。
OAuth2 規(guī)定在使用(我們打算用的)「password 流程」時,客戶端/用戶必須將 username 和 password 字段作為表單數(shù)據(jù)發(fā)送。
而且規(guī)范明確了字段必須這樣命名。因此 user-name 或 email 是行不通的。
不過不用擔心,你可以在前端按照你的想法將它展示給最終用戶。
而且你的數(shù)據(jù)庫模型也可以使用你想用的任何其他名稱。
但是對于登錄路徑操作,我們需要使用這些名稱來與規(guī)范兼容(以具備例如使用集成的 API 文檔系統(tǒng)的能力)。
規(guī)范還寫明了 username 和 password 必須作為表單數(shù)據(jù)發(fā)送(因此,此處不能使用 JSON)。
規(guī)范還提到客戶端可以發(fā)送另一個表單字段「scope」。
這個表單字段的名稱為 scope(單數(shù)形式),但實際上它是一個由空格分隔的「作用域」組成的長字符串。
每個「作用域」只是一個字符串(中間沒有空格)。
它們通常用于聲明特定的安全權(quán)限,例如:
Info
在 OAuth2 中「作用域」只是一個聲明所需特定權(quán)限的字符串。
它有沒有 : 這樣的其他字符或者是不是 URL 都沒有關(guān)系。
這些細節(jié)是具體的實現(xiàn)。
對 OAuth2 來說它們就只是字符串而已。
現(xiàn)在,讓我們使用 FastAPI 提供的實用工具來處理此問題。
首先,導入 OAuth2PasswordRequestForm,然后在 token 的路徑操作中通過 Depends 將其作為依賴項使用。
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
OAuth2PasswordRequestForm 是一個類依賴項,聲明了如下的請求表單:
Tip
OAuth2 規(guī)范實際上要求 grant_type 字段使用一個固定的值 password,但是 OAuth2PasswordRequestForm 沒有作強制約束。
如果你需要強制要求這一點,請使用 OAuth2PasswordRequestFormStrict 而不是 OAuth2PasswordRequestForm。
Info
OAuth2PasswordRequestForm 并不像 OAuth2PasswordBearer 一樣是 FastAPI 的一個特殊的類。
OAuth2PasswordBearer 使得 FastAPI 明白它是一個安全方案。所以它得以通過這種方式添加到 OpenAPI 中。
但 OAuth2PasswordRequestForm 只是一個你可以自己編寫的類依賴項,或者你也可以直接聲明 Form 參數(shù)。
但是由于這是一種常見的使用場景,因此 FastAPI 出于簡便直接提供了它。
Tip
類依賴項 OAuth2PasswordRequestForm 的實例不會有用空格分隔的長字符串屬性 scope,而是具有一個 scopes 屬性,該屬性將包含實際被發(fā)送的每個作用域字符串組成的列表。
在此示例中我們沒有使用 scopes,但如果你需要的話可以使用該功能。
現(xiàn)在,使用表單字段中的 username 從(偽)數(shù)據(jù)庫中獲取用戶數(shù)據(jù)。
如果沒有這個用戶,我們將返回一個錯誤消息,提示「用戶名或密碼錯誤」。
對于這個錯誤,我們使用 HTTPException 異常:
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
目前我們已經(jīng)從數(shù)據(jù)庫中獲取了用戶數(shù)據(jù),但尚未校驗密碼。
讓我們首先將這些數(shù)據(jù)放入 Pydantic UserInDB 模型中。
永遠不要保存明文密碼,因此,我們將使用(偽)哈希密碼系統(tǒng)。
如果密碼不匹配,我們將返回同一個錯誤。
「哈希」的意思是:將某些內(nèi)容(在本例中為密碼)轉(zhuǎn)換為看起來像亂碼的字節(jié)序列(只是一個字符串)。
每次你傳入完全相同的內(nèi)容(完全相同的密碼)時,你都會得到完全相同的亂碼。
但是你不能從亂碼轉(zhuǎn)換回密碼。
如果你的數(shù)據(jù)庫被盜,小偷將無法獲得用戶的明文密碼,只有哈希值。
因此,小偷將無法嘗試在另一個系統(tǒng)中使用這些相同的密碼(由于許多用戶在任何地方都使用相同的密碼,因此這很危險)。
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
UserInDB(**user_dict) 表示:
直接將 user_dict 的鍵和值作為關(guān)鍵字參數(shù)傳遞,等同于:
UserInDB(
username = user_dict["username"],
email = user_dict["email"],
full_name = user_dict["full_name"],
disabled = user_dict["disabled"],
hashed_password = user_dict["hashed_password"],
)
Info
有關(guān) user_dict 的更完整說明,請參閱額外的模型文檔。
token 端點的響應(yīng)必須是一個 JSON 對象。
它應(yīng)該有一個 token_type。在我們的例子中,由于我們使用的是「Bearer」令牌,因此令牌類型應(yīng)為「bearer」。
并且還應(yīng)該有一個 access_token 字段,它是一個包含我們的訪問令牌的字符串。
對于這個簡單的示例,我們將極其不安全地返回相同的 username 作為令牌。
Tip
在下一章中,你將看到一個真實的安全實現(xiàn),使用了哈希密碼和 JWT 令牌。
但現(xiàn)在,讓我們僅關(guān)注我們需要的特定細節(jié)。
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
Tip
根據(jù)規(guī)范,你應(yīng)該像本示例一樣,返回一個帶有 access_token 和 token_type 的 JSON。
這是你必須在代碼中自行完成的工作,并且要確保使用了這些 JSON 字段。
這幾乎是唯一的你需要自己記住并正確地執(zhí)行以符合規(guī)范的事情。
其余的,F(xiàn)astAPI 都會為你處理。
現(xiàn)在我們將更新我們的依賴項。
我們想要僅當此用戶處于啟用狀態(tài)時才能獲取 current_user。
因此,我們創(chuàng)建了一個額外的依賴項 get_current_active_user,而該依賴項又以 get_current_user 作為依賴項。
如果用戶不存在或處于未啟用狀態(tài),則這兩個依賴項都將僅返回 HTTP 錯誤。
因此,在我們的端點中,只有當用戶存在,身份認證通過且處于啟用狀態(tài)時,我們才能獲得該用戶:
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
Info
我們在此處返回的值為 Bearer 的額外響應(yīng)頭 WWW-Authenticate 也是規(guī)范的一部分。
任何的 401「未認證」HTTP(錯誤)狀態(tài)碼都應(yīng)該返回 WWW-Authenticate 響應(yīng)頭。
對于 bearer 令牌(我們的例子),該響應(yīng)頭的值應(yīng)為 Bearer。
實際上你可以忽略這個額外的響應(yīng)頭,不會有什么問題。
但此處提供了它以符合規(guī)范。
而且,(現(xiàn)在或?qū)恚┛赡軙泄ぞ咂谕玫讲⑹褂盟?,然后對你或你的用戶有用處?/p>
這就是遵循標準的好處...
打開交互式文檔:http://127.0.0.1:8000/docs。
點擊「Authorize」按鈕。
使用以下憑證:
用戶名:johndoe
密碼:secret
在系統(tǒng)中進行身份認證后,你將看到:
現(xiàn)在執(zhí)行 /users/me 路徑的 GET 操作。
你將獲得你的用戶數(shù)據(jù),如:
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false,
"hashed_password": "fakehashedsecret"
}
如果你點擊鎖定圖標并注銷,然后再次嘗試同一操作,則會得到 HTTP 401 錯誤:
{
"detail": "Not authenticated"
}
現(xiàn)在嘗試使用未啟用的用戶,并通過以下方式進行身份認證:
用戶名:alice
密碼:secret2
然后嘗試執(zhí)行 /users/me 路徑的 GET 操作。
你將得到一個「未啟用的用戶」錯誤,如:
{
"detail": "Inactive user"
}
現(xiàn)在你掌握了為你的 API 實現(xiàn)一個基于 username 和 password 的完整安全系統(tǒng)的工具。
使用這些工具,你可以使安全系統(tǒng)與任何數(shù)據(jù)庫以及任何用戶或數(shù)據(jù)模型兼容。
唯一缺少的細節(jié)是它實際上還并不「安全」。
在下一章中,你將看到如何使用一個安全的哈希密碼庫和 JWT 令牌。
更多建議: