FastAPI安全系列(一) 基於Password和Bearer Token的OAuth2 .0認證
阿新 • • 發佈:2021-06-10
一、獲取username和password
後臺獲取前臺提交的username和password,可以使用FastAPI的安全實用性工具獲取username和password。
OAuth2規定客戶端必需將username和password欄位作為表單資料傳送(不可使用JSON)。而且規範了欄位必須這樣命名。
from fastapi import FastAPI, Depends from fastapi.security import OAuth2PasswordRequestForm app = FastAPI() @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm =Depends()): pass
OAuth2PasswordRequestForm是一個類依賴項,聲明瞭如下的請求表單:
- username
- password
- 可選scope欄位,是一個由空格分隔的字串組成的大字串
- 可選的grant_type欄位
- 可選的client_id欄位
- 可選的 client_secret欄位
現在使用表單中的username從(偽)資料庫中獲取使用者資料,如果沒有這個使用者就返回一個錯誤。若是獲取到這個使用者,先進行Pydantic UserInDB模型,然後進行密碼密文校驗。
from fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from typing import Optional app = FastAPI() # 偽資料庫 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "[email protected]", "hashed_password": "fakehashedsecret", "disabled": False, }, "alice": { "username": "alice", "full_name": "Alice Wonderson", "email": "[email protected]", "hashed_password": "fakehashedsecret2", "disabled": True, }, } def fake_hash_password(password: str): return "fakehashed" + password class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user_dict = fake_users_db.get(form_data.username) # 從資料庫中取出使用者資訊 if not user_dict: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!") user = UserInDB(**user_dict) # 進行使用者模型校驗 hashed_password = fake_hash_password(form_data.password) # 將表單密碼進行hash加密 if not hashed_password == user.hashed_password: # 表單加密後的密碼與資料庫中的密碼進行比對 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!") return {"access_token": user.username, "token_type": "bearer"}
最後返回一個JSON資料的token令牌,該JSON物件包含:
- token_type 因為例項中使用了Bearer令牌,所以令牌型別為Bearer
- access_token 令牌字串
二、獲取活躍使用者資訊
這裡已經完成了令牌的獲取功能,與之前的獲取當前使用者的功能進行合併:
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from typing import Optional app = FastAPI() oauth2_schema = OAuth2PasswordBearer(tokenUrl="/token") # 執行生成token的地址 # 偽資料庫 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "[email protected]", "hashed_password": "fakehashedsecret", "disabled": False, }, "alice": { "username": "alice", "full_name": "Alice Wonderson", "email": "[email protected]", "hashed_password": "fakehashedsecret2", "disabled": True, }, } def fake_hash_password(password: str): return "fakehashed" + password class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user_dict = fake_users_db.get(form_data.username) # 從資料庫中取出使用者資訊 if not user_dict: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!") user = UserInDB(**user_dict) # 進行使用者模型校驗 hashed_password = fake_hash_password(form_data.password) # 將表單密碼進行hash加密 if not hashed_password == user.hashed_password: # 表單加密後的密碼與資料庫中的密碼進行比對 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!") return {"access_token": user.username, "token_type": "bearer"} def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def fake_decode_token(token: str): user = get_user(fake_users_db, token) return user async def get_current_user(token: str = Depends(oauth2_schema)): user = fake_decode_token(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authenticate credentials", headers={"WWW-Authentiacate": "Bearer"} ) return user async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user @app.get("/users/me") async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user
在生成token的基礎上進行活躍使用者資訊的獲取:
- 首先需要將生成token的地址進行指向實際生成token的路徑操作函式
- 獲取活躍使用者請求時就會去獲取認證的token
- 一旦獲取到token就與正常的API請求一樣
具體的表現是在互動文件中:
- 先點選右上角的Authorize按鈕進行認證,此時後臺會生成一個JSON物件,內含access_token欄位
- 前臺獲取到 這個access_token,將其存到某處
- 點選下面的路徑操作,如獲取活躍使用者的API,會在請求頭中攜帶該token進行認證,以及獲取相應的使用者資訊