1. 程式人生 > 其它 >drf(十一) jwt的原理及使用

drf(十一) jwt的原理及使用

JWT的原理及使用

介紹:

# jwt 一般用於使用者認證(前後端分離,微信小程式,uniapp)的開發
json web token

認證流程。

1. 區別

  • 傳統認證

    使用者登入,服務端返回token,並將token儲存在服務端
    以後使用者再來訪問,需要攜帶token,服務端獲取token後,再去資料庫中獲取token進行校驗。
    
  • jwt認證

    使用者登入,服務端返回一個token(服務端不儲存)
    以後使用者再來訪問,需要攜帶token,服務端獲取token後,再做token校驗
    
    優勢:相較於傳統的token相比,它無需在服務端儲存token
    

2. jwt 實現原理

jwt官網示例

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

注意:jwt 生成的 token 是由三段字串組成,並使用.進行連線

  • 第一段字串,Header 內部包含演算法/token 型別

    json 轉化為字串,然後做base64 url加密(base64url加密;+_)

    {
      "alg": "HS256",
      "typ": "JWT"
    }
    
  • 第二段字串,pyload 自定義值。

    json 轉化為字串,然後做base64 url加密(base64url加密;+_)

    {
      "id": "1234567890",  //可以傳入使用者id
      "name": "John Doe", // 可以自定義值,使用者名稱
      "iat": 1516239022 // 失效時間
    }
    
    /*注:一般不用傳入使用者密碼,否則有洩露的危險。*/
    
  • 第三段字串

    第一步:將第一步和第二步的密文進行拼接
    eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
    第二步:對前兩部分的密文進行HS256加密 + 加鹽
    第三步:對HS256加密的密文再做base64url加密
    
  • 以後使用者再來訪問的時候,需要攜帶 token,後端對 token 進行校驗。

    • 獲取token

    • 第一步:對 token 進行切割

      eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
      
    • 第二步:對第二段進行base64url解密,並獲取 payload 資訊,檢測 token 是否已經超時。

      {
        "id": "1234567890",  //可以傳入使用者id
        "name": "John Doe", // 可以自定義值,使用者名稱
        "iat": 1516239022 // 超時時間
      }
      
    • 第三步:把第1,2 段的內容拼接再次執行 HASH256加密。

      第一步:將第一步和第二步的密文進行拼接
      eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
      第二步:對前兩部分的密文進行HS256加密 + 加鹽
      
      密文=base64解密(SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c)
      如果相等,表示token 未被修改過(認證通過)
      
    • 說明:請保證的保密性。

3. 指令碼使用

3.1 jwt 加密使用

import datetime

import jwt
from jwt import exceptions

SALT="ABCDEF"
def create_token():
    # 構造headers
    headers={
        "alg": "HS256",
        "typ": "JWT"
    }
    pyload={
        'id':1,
        'name':'ziqingbaojian',
        'iat':datetime.datetime.now()+datetime.timedelta(days=1) #有效期一天
    }
    result=jwt.encode(payload=pyload,key=SALT,algorithm="HS256",headers=headers)
    return result

3.2 jwt 解密

def get_payload(token):
    '''
    根據token獲取payload
    :param token:
    :return:
    '''
    try:
        verified_payload = jwt.decode(token, SALT, ['HS256'])
        return verified_payload
    except exceptions.DecodeError:
        print("token 認證失敗")
    except exceptions.ExpiredSignatureError:
        print("token 已失效")
    except exceptions.InvalidTokenError:
        print("非法的token")
        
if __name__ == '__main__':
    token=create_token()
    print(token) #生成 token
    payload=get_payload(token)
    print(payload) # 生成解密資料

4. jwt原始碼淺讀

說明:原理在上述已經介紹完畢

# 使用encode方法完成加密。
result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers)
class PyJwT():
    pass
_jwt_global_obj = PyJWT()
encode = _jwt_global_obj.encode
decode_complete = _jwt_global_obj.decode_complete
decode = _jwt_global_obj.decode

檢視原始碼得知,_jwt_global_obj 使用了單利模式的設計模式。encode 呼叫物件方法。

def encode(
    self,
    payload: Dict[str, Any],
    key: str,
    algorithm: Optional[str] = "HS256",
    headers: Optional[Dict] = None,
    json_encoder: Optional[Type[json.JSONEncoder]] = None,
) -> str:
    # Check that we get a mapping
    if not isinstance(payload, Mapping):
        raise TypeError(
            "Expecting a mapping object, as JWT only supports "
            "JSON objects as payloads."
        )

    # Payload
    payload = payload.copy()
    for time_claim in ["exp", "iat", "nbf"]:
        # 將datetime轉換為已知時間格式宣告中的intDate值
        # Convert datetime to a intDate value in known time-format claims
        '''因此,時間的有效值鍵可以是這三個值'''
        if isinstance(payload.get(time_claim), datetime):
            payload[time_claim] = timegm(payload[time_claim].utctimetuple())

    json_payload = json.dumps(
        payload, separators=(",", ":"), cls=json_encoder
    ).encode("utf-8")

    return api_jws.encode(json_payload, key, algorithm, headers, json_encoder) #執行 api_jws.encode 方法。
def encode(
    self,
    payload: bytes,
    key: str,
    algorithm: Optional[str] = "HS256",
    headers: Optional[Dict] = None,
    json_encoder: Optional[Type[json.JSONEncoder]] = None,
) -> str:
    segments = []

    if algorithm is None:
        algorithm = "none"

    # Prefer headers["alg"] if present to algorithm parameter.
    if headers and "alg" in headers and headers["alg"]:
        algorithm = headers["alg"]

    # Header
    header = {"typ": self.header_typ, "alg": algorithm} # 預設頭部資訊

    if headers:
        self._validate_headers(headers)
        header.update(headers)
        if not header["typ"]:
            del header["typ"]

    json_header = json.dumps(
        header, separators=(",", ":"), cls=json_encoder
    ).encode() # 使用json格式化加密頭

    segments.append(base64url_encode(json_header)) # 使用 base64url進行加密
    segments.append(base64url_encode(payload))#使用base64url加密payload資料部分
	# 將兩次結果儲存到 segments 列表
    
    # Segments
    signing_input = b".".join(segments) # 使用`.`進行拼接兩端字串,作為待加密的字串
    try:
        alg_obj = self._algorithms[algorithm] # 獲取加密演算法的型別
        key = alg_obj.prepare_key(key)# 加鹽
        signature = alg_obj.sign(signing_input, key) # 使用加密演算法加鹽,並生成密文

    except KeyError as e:
        if not has_crypto and algorithm in requires_cryptography:
            raise NotImplementedError(
                "Algorithm '%s' could not be found. Do you have cryptography "
                "installed?" % algorithm
            ) from e
        else:
            raise NotImplementedError("Algorithm not supported") from e

    segments.append(base64url_encode(signature)) # 將base64url加密密文,並新增到列表中

    encoded_string = b".".join(segments) #是用`.`將三段密文進行拼接

    return encoded_string.decode("utf-8") # 將結果進行返回。

解密原始碼

def decode(
    self,
    jwt: str,
    key: str = "",
    algorithms: List[str] = None,
    options: Dict = None,
    **kwargs,
) -> Dict[str, Any]:
    # 解密時需要傳入的引數。
    
    # 執行方法decode_complete
    decoded = self.decode_complete(jwt, key, algorithms, options, **kwargs)
    return decoded["payload"] #返回結果字典中的值

decode_complete()

def decode_complete(
    self,
    jwt: str,
    key: str = "",
    algorithms: List[str] = None,
    options: Dict = None,
    **kwargs,
) -> Dict[str, Any]:
    
    # 將verify_signature賦值為True
    if options is None: # 空字典賦值
        # 驗證簽名
        options = {"verify_signature": True}
    else:
        options.setdefault("verify_signature", True)

    if not options["verify_signature"]:
        options.setdefault("verify_exp", False)
        options.setdefault("verify_nbf", False)
        options.setdefault("verify_iat", False)
        options.setdefault("verify_aud", False)
        options.setdefault("verify_iss", False)

    if options["verify_signature"] and not algorithms:
        raise DecodeError(
            'It is required that you pass in a value for the "algorithms" argument when calling decode().'
        )
	# 執行該方法decode_complete
    decoded = api_jws.decode_complete(
        jwt,
        key=key,
        algorithms=algorithms,
        options=options,
        **kwargs,
    )

    try:
        payload = json.loads(decoded["payload"]) #使用json解析資料。
    except ValueError as e:
        raise DecodeError("Invalid payload string: %s" % e)
    if not isinstance(payload, dict):
        raise DecodeError("Invalid payload string: must be a json object")

    merged_options = {**self.options, **options}
    self._validate_claims(payload, merged_options, **kwargs)

    decoded["payload"] = payload
    return decoded

decode_complete()

def decode_complete(
    self,
    jwt: str,
    key: str = "",
    algorithms: List[str] = None,
    options: Dict = None,
    **kwargs,
) -> Dict[str, Any]:
    if options is None:
        options = {}
    merged_options = {**self.options, **options}
    verify_signature = merged_options["verify_signature"]

    if verify_signature and not algorithms:
        raise DecodeError(
            'It is required that you pass in a value for the "algorithms" argument when calling decode().'
        )
	# 執行私有返回資料
    payload, signing_input, header, signature = self._load(jwt)

    if verify_signature:
        self._verify_signature(signing_input, header, signature, key, algorithms)

    return {
        "payload": payload,
        "header": header,
        "signature": signature,
    }

_load();

def _load(self, jwt):
    if isinstance(jwt, str):
        jwt = jwt.encode("utf-8") #轉換編碼

    if not isinstance(jwt, bytes):
        raise DecodeError(f"Invalid token type. Token must be a {bytes}")

    try:
        # 使用字串分割(按照`.`),從最後一個點分割,獲取到兩個值;分別是一二部分和三部分
        signing_input, crypto_segment = jwt.rsplit(b".", 1)
        
        # 對前一個分割的前一部分進行再次分割,得到前兩部分的密文
        header_segment, payload_segment = signing_input.split(b".", 1)
    except ValueError as err:
        raise DecodeError("Not enough segments") from err

    try:
        header_data = base64url_decode(header_segment) #使用base64url解密請求頭
    except (TypeError, binascii.Error) as err:
        raise DecodeError("Invalid header padding") from err

    try:
        header = json.loads(header_data) # json解析
    except ValueError as e:
        raise DecodeError("Invalid header string: %s" % e) from e

    if not isinstance(header, Mapping):
        raise DecodeError("Invalid header string: must be a json object")

    try:
        payload = base64url_decode(payload_segment) #解密第二部分。自定義值的部分
    except (TypeError, binascii.Error) as err:
        raise DecodeError("Invalid payload padding") from err

    try:
        signature = base64url_decode(crypto_segment) 
        #將第三部分的密文進行base64url解密,得到HS256加密的密文
    except (TypeError, binascii.Error) as err:
        raise DecodeError("Invalid crypto padding") from err

    return (payload, signing_input, header, signature) #返回結果

補充:字串分割

rsplit()從右(尾部)面開始分割,第一個點作為分割元素

5.drf中使用jwt認證

5.1 檢視函式

from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
# Create your views here.
from app01 import models
from appjwt.utils import jwt_auth

class JwtView(APIView):
    authentication_classes = [] #注:登入函式不需要進行驗證。
    def post(self,request,*args,**kwargs):
        ret = {'code':1000,'msg':None} #初始化返回值
        try:
            user = request._request.POST.get('username')
            pwd = request._request.POST.get('password')
            # 往資料庫查詢引數
            obj = models.UserInfo.objects.filter(username=user,password=pwd).first()
            if not obj:# 使用者不存在
                ret['code'] = 1001
                ret['msg'] = "使用者名稱或密碼錯誤"
            # 為登入使用者建立token
            payload={
                "id":obj.id,
                "name":obj.username
            }
            token = jwt_auth.create_token(payload)# 使用預設的失效時間
            ret['token'] = token
        except Exception as e:
            ret['code'] = 1002
            ret['msg'] = '請求異常'
        return Response(ret)

5.2 認證類

說明:將解密 jwt 的程式碼封裝到認證類中,繼承 drf 中的認證類。

import jwt
from jwt import exceptions
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from django.conf import settings

class JwtAuthentication(BaseAuthentication):
    def authenticate(self, request):
        token=request.query_params.get("token")
        SALT=settings.SECRET_KEY
        try:
            verified_payload = jwt.decode(token, SALT, ['HS256'])
        except exceptions.DecodeError:
            raise AuthenticationFailed({"1000":"token 認證失敗"})
        except exceptions.ExpiredSignatureError:
            raise AuthenticationFailed({'1001':"token 已失效"})
        except exceptions.InvalidTokenError:
            raise AuthenticationFailed({"1002":"非法token"})
        return (verified_payload,token)

    def authenticate_header(self, request):
        pass

5.3 生成token的檔案

import datetime

import jwt

from django.conf import settings

SALT=settings.SECRET_KEY
def create_token(payload,timeout=1):
    # 構造headers
    headers={
        "alg": "HS256",
        "typ": "JWT"
    }
    # 構造payload,預設失效時間是一分鐘
    payload['exp']=datetime.datetime.now()+datetime.timedelta(minutes=timeout)
    token=jwt.encode(payload=payload,key=SALT,algorithm="HS256",headers=headers)
    return token

配置檔案

REST_FRAMEWORK={
    "DEFAULT_AUTHENTICATION_CLASSES":['appjwt.utils.auth.JwtAuthentication',], #全域性認證類
    
    
    
    # "UNAUTHENTICATED_USER":None, # 匿名,request.user = None
    # "UNAUTHENTICATED_TOKEN":None,
    # "DEFAULT_PERMISSION_CLASSES":['app01.utils.permission.MyPermission',],
    # "DEFAULT_THROTTLE_CLASSES":['app01.utils.throttle.MyThrottle',],# 匿名使用者不能在全域性配置需要為登入功能單獨新增
    "DEFAULT_THROTTLE_RATES":{
        "visit":'3/m',#一分鐘三次,匿名使用者
        "loginuser":'10/m',# 登入成功,一分鐘10次
    },
    "PAGE_SIZE":2,
    "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",
    "DEFAULT_VERSION":'v1',
    "ALLOWED_VERSIONS":['v1','v2'], #允許的版本號
    "VERSION_PARAM":"version",# 這個引數應該和 路由中的名稱相同version/
    "DEFAULT_PARSER_CLASSES":['rest_framework.parsers.JSONParser','rest_framework.parsers.FormParser'],
    "DEFAULT_RENDERER_CLASSES":['rest_framework.renderers.JSONRenderer','rest_framework.renderers.BrowsableAPIRenderer']
}

5.4 使用效果

登入生成 token

根據token查詢結果

6.擴充套件

pip install djangorestframework-jwt # 內部仍然呼叫的pyjwt

說明:djangorestframework-jwt 的使用僅限制在 drf 中,而 pyjwt 可以使用在任何框架中使用範圍較廣。

繼續努力,終成大器!