1. 程式人生 > 其它 >FastAPI(59)- 詳解使用 OAuth2PasswordBearer + JWT 認證

FastAPI(59)- 詳解使用 OAuth2PasswordBearer + JWT 認證

JWT

  • JSON Web Tokens
  • 它是一個將 JSON 物件編碼為密集且沒有空格的長字串的標準
  • 使用 JWT token 和安全密碼 hash 使應用程式真正安全

JWT 小栗子

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
  • 它還沒有加密,因此任何人都可以從該字串中恢復資訊
  • 但是已經加簽了,因此,當收到發出的 token 時,可以驗證是否實際發出了它
  • 建立一個有效期為 1 周的 token,然後當用戶第二天帶著 token 回來時,知道該使用者仍然登入到系統中
  • 一週後,令牌將過期,使用者將無法獲得授權,必須重新登入以獲取新的 token
  • 如果使用者(或第三方)試圖修改 token 以更改過期時間,將能夠發現它,因為簽名不匹配

前提

需要安裝 python-jose 來在 Python 中生成和驗證 JWT token

pip install python-jose
pip install cryptography

JWT 流程

  • 前端登入提交使用者名稱、密碼
  • 後端拿到使用者名稱、密碼進行驗證,如果沒問題,則返回token
  • 前端訪問需要認證的url 時攜帶token
  • 後端拿到token 進行驗證
  • 驗證通過返回使用者資訊及訪問的url 資訊

hash 密碼

前提

  • 資料庫儲存的密碼不能是明文的,需要加密
  • PassLib 是一個用於處理雜湊密碼的包
  • 推薦的演算法是 「Bcrypt
pip install passlib
pip install bcrypt

包含的功能

  • hash 密碼
  • 驗證 hash 密碼是否一致
  • 通過使用者名稱、密碼驗證使用者

hash 密碼

# 匯入 CryptContext
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=['
bcrypt'], deprecated="auto") # 密碼加密 def hash_password(password: str) -> str: return pwd_context.hash(password)

驗證 hash 密碼是否一致

# 驗證密碼
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

通過使用者名稱、密碼驗證使用者

# 模擬從資料庫中根據使用者名稱查詢使用者
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 根據使用者名稱、密碼來驗證使用者
def authenticate_user(db, username: str, password: str):
    # 1、通過使用者名稱模擬去資料庫查詢使用者
    user = get_user(db, username)
    if not user:
        # 2、使用者不存在
        return False
    if not verify_password(password, user.hashed_password):
        # 3、密碼驗證失敗
        return False
    # 4、驗證通過,返回使用者資訊
    return user

處理 JWT token

生成用於簽名 JWT token 的隨機金鑰

在命令列敲

> openssl rand -hex 32
dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c

常量池

方便後續複用

# 常量池
# 通過 openssl rand -hex 32 生成的隨機金鑰
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密演算法
ALGORITHM = "HS256"
# 過期時間,分鐘
ACCESS_TOKEN_EXPIRE_MINUTES = 30

建立生成 JWT token 需要用的 Pydantic Model

其實不建立也沒事,但這裡為了規範和資料校驗功能,還是建吧

# 返回給客戶端的 Token Model
class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None

生成 JWT token

# 匯入 JWT 相關庫
from jose import JWTError, jwt


# 使用者名稱、密碼驗證成功後,生成 token
def create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 加密
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

修改 get_current_user

獲取 token 後解碼並獲取使用者

# 匯入 JWT 相關庫
from jose import JWTError, jwt

# 根據當前使用者的 token 獲取使用者,token 已失效則返回錯誤碼
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 1、解碼收到的 token
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        # 2、拿到 username
        username: str = payload.get("sub")
        if not username:
            # 3、若 token 失效,則返回錯誤碼
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    # 4、獲取使用者
    user = get_user(fake_users_db, username=token_data.username)
    if not user:
        raise credentials_exception
    # 5、返回使用者
    return user

修改獲取 token 的路徑操作函式

# OAuth2 獲取 token 的請求路徑
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、獲取客戶端傳過來的使用者名稱、密碼
    username = form_data.username
    password = form_data.password
    # 2、驗證使用者
    user = authenticate_user(fake_users_db, username, password)
    if not user:
        # 3、驗證失敗,返回錯誤碼
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    # 5、返回 JSON 響應
    return {"access_token": access_token, "token_type": "bearer"}

sub 的是什麼?

  • JWT 規範中有一個 sub key,子健
  • 它是可選的,這裡的作用是通過使用者名稱設定使用者標識
  • 子健應該在整個應用程式中具有唯一的識別符號,並且它應該是一個字串

完整的程式碼

#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠蘿測試筆記
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/10/6 12:05 下午
# file: 49_bearer.py
"""
from typing import Optional

import uvicorn
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta
# 匯入 CryptContext
from passlib.context import CryptContext
# 匯入 JWT 相關庫
from jose import JWTError, jwt

# 常量池
# 通過 openssl rand -hex 32 生成的隨機金鑰
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密演算法
ALGORITHM = "HS256"
# 過期時間,分鐘
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 模擬資料庫
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


# 返回給客戶端的 User Model,不需要包含密碼
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


# 繼承 User,用於密碼驗證,所以要包含密碼
class UserInDB(User):
    hashed_password: str


# 獲取 token 路徑操作函式的響應模型
class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


# 例項物件池
app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")


# 密碼加密
def hash_password(password: str) -> str:
    return pwd_context.hash(password)


# 驗證密碼
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


# 模擬從資料庫中根據使用者名稱查詢使用者
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 根據使用者名稱、密碼來驗證使用者
def authenticate_user(db, username: str, password: str):
    # 1、通過使用者名稱模擬去資料庫查詢使用者
    user = get_user(db, username)
    if not user:
        # 2、使用者不存在
        return False
    if not verify_password(password, user.hashed_password):
        # 3、密碼驗證失敗
        return False
    # 4、驗證通過,返回使用者資訊
    return user


# 使用者名稱、密碼驗證成功後,生成 token
def create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 加密
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# OAuth2 獲取 token 的請求路徑
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、獲取客戶端傳過來的使用者名稱、密碼
    username = form_data.username
    password = form_data.password
    # 2、驗證使用者
    user = authenticate_user(fake_users_db, username, password)
    if not user:
        # 3、驗證失敗,返回錯誤碼
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    # 5、返回 JSON 響應
    return {"access_token": access_token, "token_type": "bearer"}


# 根據當前使用者的 token 獲取使用者,token 已失效則返回錯誤碼
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 1、解碼收到的 token
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        # 2、拿到 username
        username: str = payload.get("sub")
        if not username:
            # 3、若 token 失效,則返回錯誤碼
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    # 4、獲取使用者
    user = get_user(fake_users_db, username=token_data.username)
    if not user:
        raise credentials_exception
    # 5、返回使用者
    return user


# 判斷使用者是否活躍,活躍則返回,不活躍則返回錯誤碼
async def get_current_active_user(user: User = Depends(get_current_user)):
    if user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User")
    return user


# 獲取當前使用者資訊
@app.get("/user/me")
async def read_user(user: User = Depends(get_current_active_user)):
    return user


# 正常的請求
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}


if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)

請求結果