1. 程式人生 > 其它 >FastAPI安全系列(一) 基於Password和Bearer Token的OAuth2 .0認證

FastAPI安全系列(一) 基於Password和Bearer Token的OAuth2 .0認證

一、獲取username和password

 後臺獲取前臺提交的username和password,可以使用FastAPI的安全實用性工具獲取username和password。

 OAuth2規定客戶端必需將username和password欄位作為表單資料傳送(不可使用JSON)。而且規範了欄位必須這樣命名。

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm =
Depends()): pass

OAuth2PasswordRequestForm是一個類依賴項,聲明瞭如下的請求表單:

  • username
  • password
  • 可選scope欄位,是一個由空格分隔的字串組成的大字串
  • 可選的grant_type欄位
  • 可選的client_id欄位
  • 可選的 client_secret欄位

現在使用表單中的username從(偽)資料庫中獲取使用者資料,如果沒有這個使用者就返回一個錯誤。若是獲取到這個使用者,先進行Pydantic UserInDB模型,然後進行密碼密文校驗。

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from typing import Optional app = FastAPI() # 偽資料庫 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "[email protected]", "hashed_password
": "fakehashedsecret", "disabled": False, }, "alice": { "username": "alice", "full_name": "Alice Wonderson", "email": "[email protected]", "hashed_password": "fakehashedsecret2", "disabled": True, }, } def fake_hash_password(password: str): return "fakehashed" + password class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user_dict = fake_users_db.get(form_data.username) # 從資料庫中取出使用者資訊 if not user_dict: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!") user = UserInDB(**user_dict) # 進行使用者模型校驗 hashed_password = fake_hash_password(form_data.password) # 將表單密碼進行hash加密 if not hashed_password == user.hashed_password: # 表單加密後的密碼與資料庫中的密碼進行比對 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!") return {"access_token": user.username, "token_type": "bearer"}

最後返回一個JSON資料的token令牌,該JSON物件包含:

  • token_type 因為例項中使用了Bearer令牌,所以令牌型別為Bearer
  • access_token 令牌字串

二、獲取活躍使用者資訊

這裡已經完成了令牌的獲取功能,與之前的獲取當前使用者的功能進行合併:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

oauth2_schema = OAuth2PasswordBearer(tokenUrl="/token") # 執行生成token的地址

# 偽資料庫
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


def fake_hash_password(password: str):
    return "fakehashed" + password


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)  # 從資料庫中取出使用者資訊
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    user = UserInDB(**user_dict)  # 進行使用者模型校驗
    hashed_password = fake_hash_password(form_data.password)  # 將表單密碼進行hash加密
    if not hashed_password == user.hashed_password:  # 表單加密後的密碼與資料庫中的密碼進行比對
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    return {"access_token": user.username, "token_type": "bearer"}


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token: str):
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_schema)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authenticate credentials",
            headers={"WWW-Authentiacate": "Bearer"}
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
    return current_user


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

在生成token的基礎上進行活躍使用者資訊的獲取:

  • 首先需要將生成token的地址進行指向實際生成token的路徑操作函式
  • 獲取活躍使用者請求時就會去獲取認證的token
  • 一旦獲取到token就與正常的API請求一樣

具體的表現是在互動文件中:

  • 先點選右上角的Authorize按鈕進行認證,此時後臺會生成一個JSON物件,內含access_token欄位
  • 前臺獲取到 這個access_token,將其存到某處
  • 點選下面的路徑操作,如獲取活躍使用者的API,會在請求頭中攜帶該token進行認證,以及獲取相應的使用者資訊