1. 程式人生 > 實用技巧 >FastAPI 安全機制(四) OAuth2 scopes

FastAPI 安全機制(四) OAuth2 scopes

作者:麥克煎蛋 出處:https://www.cnblogs.com/mazhiyong/ 轉載請保留這段宣告,謝謝!

OAuth2 scopes是一種細粒度的安全許可機制,通常用來對使用者或者第三方應用提供特定的訪問許可。

在OAuth2的規範中,scopes是一個基於空格分隔符的字串列表。這些scopes代表著"許可"。

每一個scope項是一個不帶空格的字串,通常用來表示特定的安全許可,例如:

  • users:read 或者 users:write:這是通常的使用場景
  • instagram_basic: Facebook / Instagram的使用場景
  • https://www.googleapis.com/auth/drive
    : Google使用場景

scope的具體內容根據業務需求而定,對OAuth2來說只是字串。

我們可以在FastAPI中直接使用無縫整合的OAuth2 scopes。

一、通過token返回scopes資訊

1、後臺獲取許可權

通常情況下,使用者登陸成功以後,我們可以獲取到使用者真實的許可權,並通過token返回scopes資訊。

示例中的scopes資訊僅是演示,使用時應該根據使用者的實際系統許可權來賦值。

@app.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    
# 首先校驗使用者資訊 user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, )
# 生成並返回token資訊 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username, "scopes": "me"}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}

2、使用者登陸時選擇

使用者也可以在登陸時選擇scopes資訊,這也是Google等登陸時所用的機制。

我們需要在OAuth2PasswordBearer中新增scopes資訊。

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

同樣我們也是通過token返回scopes資訊,只不過scopes資訊的來源是使用者端。

@app.post("/login2", response_model=Token)
async def login_for_access_token2(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    # 首先校驗使用者資訊
    user = authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 生成並返回token資訊
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes},
        expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

我們可以在互動式文件中檢視顯示效果:

二、scopes許可權校驗

當有請求訪問時,需要從token中解析出有效資訊,不僅需要完成使用者身份校驗,還需要完成基於scopes的許可權校驗。

1、scopes資料傳遞

首先需要在路徑操作中新增對scopes的依賴項:

@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: User = Security(get_current_user, scopes=["me"])
):
    return current_user

Security實際上是Depends的子類,只不過多了一個引數,可以接收scopes的列表資訊。

通過使用Security而不是Depends,FastAPI將會知道它會宣告並內部使用scopes資訊,並且在互動式文件中顯示這些資訊。

2、scopes資料解析

SecurityScopes的屬性scopes,是一個包含所有它需要的scopes以及所有依賴項(把它作為子依賴項)的列表。

SecurityScopes的屬性scope_str,是包含所有scopes的一個字串(以空格分隔)。

class SecurityScopes:
    def __init__(self, scopes: List[str] = None):
        self.scopes = scopes or []
        self.scope_str = " ".join(self.scopes)
async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = f"Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        print(username)

        if username is None:
            raise credentials_exception

        # 讀取scopes資訊
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
except (PyJWTError, ValidationError):
        raise credentials_exception

    # 使用者身份校驗
    user = DBUser.get_by_username(db, token_data.username)
    if user is None:
        raise credentials_exception

    # 基於scope的許可權校驗
    for scope in security_scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )

    return user

3、依賴項樹和scopes

修改部分程式碼邏輯如下:

async def get_current_active_user(
    current_user: User = Security(get_current_user, scopes=["me"])
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: User = Security(get_current_active_user, scopes=["me"])
):
    return current_user


@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}


@app.get("/users/me/items/")
async def read_own_items(
    current_user: User = Security(get_current_active_user, scopes=["items"])
):
    return [{"item_id": "Foo", "owner": current_user.username}]


@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
    return {"status": "ok"}

關於依賴項和scopes的層次提下如下:

路徑操作read_own_items有:

  * 依賴項需要的scopes: ["items"]

  * 依賴項函式 get_current_active_user:

    * 依賴項需要的scopes: ["me"]

    * 依賴項函式 get_current_user:

        * 自身不需要scopes

        * 依賴項oauth2_scheme

        *SecurityScopes型別的引數security_scopes

          * 引數security_scopes的屬性scopes包含所有以上宣告的scopes的一個列表,因此

            * 對於路徑操作 read_own_items來說,security_scopes.scopes 包含 ["me", "items"]

            * 對於路徑操作 read_users_me來說,security_scopes.scopes 包含 ["me"]

            * 對於路徑操作 read_system_status來說,security_scopes.scopes 的內容為[]