1. 程式人生 > >API 接口認證與傳輸數據加密

API 接口認證與傳輸數據加密

目的 style 整數 req ngs 小數 params content 場景

應用場景

cmdb 這類項目的資產入庫等操作,當agent 與server 端通過api 進行數據交互時,為了安全采取了兩項安全措施:1、server 端需要對agent 端進行身份驗證(避免有冒充agent 請況);2、當agent 采集數據提交到server 端過程需要對數據加密,數據傳輸到agent 端時再數據解密(防止數據傳輸過程被截取,泄露數據)。

api 身份認證

認證原理

server 與 agent保存相同的字符串密鑰,agent 通過密鑰生成簽名值,把數據與簽名值發送給server 端,server 根據自己的配置文件保存的密鑰生成簽名值與agent 端簽名值比對,兩者一致則認證通過否則失敗。

安全加強(只針對數據傳輸中簽名被截取安全隱患,暫不考慮server與agent 本地保存的字符串密鑰安全隱患)

簽名值一次性

agent 每次發送過來的簽名值都會被server 記錄,每次發送的簽名值如果已經使用過則認證失敗(防止傳輸中被截取後黑客可以永久使用被截取的簽名密鑰來認證)

動態生成簽名

由於“簽名值一次性”所以簽名密鑰必須動態生成,每次用不同的密鑰。agent 每次生成簽名前生成一個時間值,把時間與保存的字符串密鑰組合在一起再生成一個簽名值。簽名值變成了動態。

簽名超時設置

agent 每次發送簽名與生成簽名的時間到server ,server 端接收時會再本段生成一個時間,將兩個時間值比較,當時間差超過10s ,則認證失敗。(“簽名值一次性”中對認證過的簽名值記錄在了字典中以{"client_time":"簽名密鑰"}形式存儲,為了避免此字典無線增大占用內存,所以會對字典中的時間判斷,超過10s 的數據進行刪除,但是刪除的簽名數據再有相同的簽名數據來認證時就不能通過“簽名一次性”來排除了,所以“簽名超時設置”解決了此問題)

代碼

agent 端

技術分享圖片
import requests
import time
import hashlib


def gen_sign(ctime):
    key = "uiakjsdfasjdf898"                                 #此字符串在server端也保存一份   
    val = "%s|%s" %(key,ctime)                           #加入時間,動態生成簽名密鑰
    obj = hashlib.md5()                                        #
生成一個對象 obj.update(val.encode(utf-8)) #md5只能對字節形式的數據進行加密 return obj.hexdigest() ctime = int(time.time()*1000) #*1000單位變ms,int 轉化為整數(去掉小數點後的數字,而不是四舍五入) result = requests.post( url = http://127.0.0.1:8000/api/test/, #註意路徑結尾一定更要以/結尾 params = {sign:gen_sign(ctime),ctime:ctime} #相當於在url = ‘http://.../?key="alskdjflskdfjkjk"‘,也就是在url中以get形式發送數據 ) print(result.url,result.text) #認證邏輯:agent 把簽名密鑰與生成簽名密鑰的時間一並發送到server端,server 端通過自身保存的key與agent 的時間生成簽名值,兩個簽名比對進行認證
View Code

server 端(view.py)

技術分享圖片
import json
import hashlib                                    #此模塊裏有md5的類
import time
from django.shortcuts import render,HttpResponse
from django.views import View
from django.conf import settings
from rest_framework.views import APIView
from rest_framework.response import Response

#生成簽名的函數
def gen_sign(ctime):
    """
    生成簽名
    :param ctime:
    :return:
    """
    val = %s|%s %(settings.URL_AUTH_KEY,ctime,)
    obj = hashlib.md5()                                #實力化一個md5對象
    obj.update(val.encode(utf-8))                    #md5對象只能對bytes格式數據加密
    return obj.hexdigest()                            #返回生成的簽名值
    
#所有的驗證成功的簽名密鑰都會記錄在SIGN_RECORD 中
SIGN_RECORD = {}

class TestView(APIView):
    def post(self,request):
        print("請求來了")
        #由於繼承了APIView所以此處的request 不是django原生的request,原生的為request._request
        client_sign = request._request.GET.get(sign)              #獲取agent發來的簽名密鑰
        client_ctime = int(request._request.GET.get(ctime))       #獲取agent 發來的時間
        server_time = int(time.time() * 1000)                       #記錄當前時間,*1000單位變成ms
        
        #認證第一關
        if server_time - client_ctime > 5000:                       #比對兩個時間,超過5s,則認證失敗
            return Response({status: false, error: "路上時間太久了"})
        # 認證第二關
        if client_sign in SIGN_RECORD:                              #已經認證通過的簽名會保存在SING_RECORD,新來密鑰如果被認證過了就會認證失敗
            return Response("簽名已經被使用過了")
        #認證第三關
        server_sign = gen_sign(client_ctime)                        #server 端生成簽名
        if client_sign != server_sign:                              #密鑰比對來通過認證
            return Response({status:false,error:403})
        #認證通過進行以下操作
        SIGN_RECORD[client_sign] = client_ctime                     #通過認證的密鑰存儲起來,下次防止密鑰重復使用

        #防止SIGN_RECORD無線增大,對於5s 之前的記錄刪除。對於已經認證通過並存儲在該記錄中的密鑰,超過10s刪除後,
        ## 那麽認證第二關就失效了,所以認證第一關就是為此而設
        for k in list(SIGN_RECORD.keys()):
            v = SIGN_RECORD[k]
            if server_time - v  > 5000:
                print("已經超過5s")
                del SIGN_RECORD[k]
        return Response({status:true,data:666})
View Code

數據加密

加密原理

通過rsa 模塊生成公鑰和私鑰分別保存在agent 與server 端,agent 通過公鑰加密數據傳輸到server 端通過私鑰解密。rsa 加密分為1024與2048兩種,生成公私鑰對象時指定參數即可 pub_key_obj, priv_key_obj = rsa.newkeys(1024) ,1024指的是能加密的數據的位數,換算成字節就是1024/8=128 bytes

代碼

生成公私鑰,分別保存在server/agent 兩端

技術分享圖片
# ######### 1. 生成公鑰私鑰 #########
pub_key_obj, priv_key_obj = rsa.newkeys(1024)                 # 1024/8 = 128 ,128 - 11 = 117
# 公鑰字符串
pub_key_str = pub_key_obj.save_pkcs1()
pub_key_code = base64.standard_b64encode(pub_key_str)

# 私鑰字符串
priv_key_str = priv_key_obj.save_pkcs1()
priv_key_code = base64.standard_b64encode(priv_key_str)

將生成的公私鑰(經過base64.standard_b64encode處理的)分別保存在agent/server 配置文件settings.py中

原生的公鑰:
    b-----BEGIN RSA PUBLIC KEY-----\nMIGJAoGBAJo2DEaukeIBTvc5vscIrh0gU79N+XRrf6NBGxGi6eOh7muzH3VV7UIn\nZvfUE3Nxu97DiMAC1u2JEudM8iatMChSLxSh9qFNB36ejz7dCi9DrAH6Ce46JZ7h\n+iwlo9x7Qr4uLJrQsHhia4/i89aAooNV6I8Ne+WOe3V1PvEUs+PhAgMBAAE=\n-----END RSA PUBLIC KEY-----\n
base64.standard_b64encode編碼處理後:
    bLS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JR0pBb0dCQUpvMkRFYXVrZUlCVHZjNXZzY0lyaDBnVTc5TitYUnJmNk5CR3hHaTZlT2g3bXV6SDNWVjdVSW4KWnZmVUUzTnh1OTdEaU1BQzF1MkpFdWRNOGlhdE1DaFNMeFNoOXFGTkIzNmVqejdkQ2k5RHJBSDZDZTQ2Slo3aAoraXdsbzl4N1FyNHVMSnJRc0hoaWE0L2k4OWFBb29OVjZJOE5lK1dPZTNWMVB2RVVzK1BoQWdNQkFBRT0KLS0tLS1FTkQgUlNBIFBVQkxJQyBLRVktLS0tLQo=
View Code

agent 端

技術分享圖片
import rsa
import base64
from config import settings
#數據加密函數
def encrypt(value_bytes):
    """
    rsa 公鑰加密
    :param value_bytes: 要加密的字節
    :return:
    """
    key_str = base64.standard_b64decode(settings.PUB_KEY)    #settings.py 中記錄了公鑰
    pk = rsa.PublicKey.load_pkcs1(key_str)
    #rsa 1024 能加密的數據大小為128(1024/8)字節,rsa 本身數據占用11字節,所以對大於128字節的數據分批次加密(每次加密117(128-11)字節)最後加密的數據拼接即可。
    data_list = []
    for i in range(0,len(value_bytes),117):
        chunk = value_bytes[i:i+117]
        result = rsa.encrypt(chunk, pk)
        data_list.append(result)

    return b‘‘.join(data_list)

#進行數據加密,並且發送到agent
ctime = int(time.time()*1000)
r1 = requests.post(
     url=self.asset_api,
     params = {sign:gen_sign(ctime),ctime:ctime},         #傳輸簽名密鑰
     data=encrypt(json.dumps(info).encode(utf-8)),          #傳輸數據rsa 加密
     headers={Content-Type:application/json}
    )
View Code

server 端

技術分享圖片
#使用私鑰解密的函數
import rsa
import base64
def decrypt(bytes_value):
    """
    rsa解密
    :param bytes_value: 要解密的數據為bytes 類型,因為rsa 只能對bytes 類型數據加密,所以解密數據為bytes類型
    :return: 解密完成的字節
    """
    key_str = base64.standard_b64decode(settings.PRIV_KEY)    #生成密鑰後使用base64.standard_b64encode對公鑰進行了編碼處理,所以此處要用decode 解碼
    pk = rsa.PrivateKey.load_pkcs1(key_str)                    #生成私鑰證書    
    #rsa 1024 能加密的數據大小為128(1024/8)字節,所以需要對大於128字節的數據分批次加密(每次加密128字節)最後加密的數據拼接即可。所以解密時也是每次解密128字節,最後拼接即可
    result = []
    for i in range(0,len(bytes_value),128):
        chunk = bytes_value[i:i+128]                        
        val = rsa.decrypt(chunk, pk)                        #通過公鑰證書對數據chunk 加密
        result.append(val)
    return b‘‘.join(result)

#利用私鑰解密的函數解密數據
class AssetView(APIAuthView):
    def post(self, request, *args, **kwargs):
        body = decrypt(request._request.body)           #通過decrypt 解密數據,body 中才是原生的數據
        asset_info = json.loads(body.decode(utf-8))   #解密後的數據解碼再序列化加載
View Code

API 接口認證與傳輸數據加密