API 接口認證與傳輸數據加密
應用場景
cmdb 這類項目的資產入庫等操作,當agent 與server 端通過api 進行數據交互時,為了安全采取了兩項安全措施:1、server 端需要對agent 端進行身份驗證(避免有冒充agent 請況);2、當agent 采集數據提交到server 端過程需要對數據加密,數據傳輸到agent 端時再數據解密(防止數據傳輸過程被截取,泄露數據)。
api 身份認證
認證原理
server 與 agent保存相同的字符串密鑰,agent 通過密鑰生成簽名值,把數據與簽名值發送給server 端,server 根據自己的配置文件保存的密鑰生成簽名值與agent 端簽名值比對,兩者一致則認證通過否則失敗。
安全加強(只針對數據傳輸中簽名被截取安全隱患,暫不考慮server與agent 本地保存的字符串密鑰安全隱患)
簽名值一次性
agent 每次發送過來的簽名值都會被server 記錄,每次發送的簽名值如果已經使用過則認證失敗(防止傳輸中被截取後黑客可以永久使用被截取的簽名密鑰來認證)
動態生成簽名
由於“簽名值一次性”所以簽名密鑰必須動態生成,每次用不同的密鑰。agent 每次生成簽名前生成一個時間值,把時間與保存的字符串密鑰組合在一起再生成一個簽名值。簽名值變成了動態。
簽名超時設置
agent 每次發送簽名與生成簽名的時間到server ,server 端接收時會再本段生成一個時間,將兩個時間值比較,當時間差超過10s ,則認證失敗。(“簽名值一次性”中對認證過的簽名值記錄在了字典中以{"client_time":"簽名密鑰"}形式存儲,為了避免此字典無線增大占用內存,所以會對字典中的時間判斷,超過10s 的數據進行刪除,但是刪除的簽名數據再有相同的簽名數據來認證時就不能通過“簽名一次性”來排除了,所以“簽名超時設置”解決了此問題)
代碼
agent 端
import requests import time import hashlib def gen_sign(ctime): key = "uiakjsdfasjdf898" #此字符串在server端也保存一份 val = "%s|%s" %(key,ctime) #加入時間,動態生成簽名密鑰 obj = hashlib.md5() #View Code生成一個對象 obj.update(val.encode(‘utf-8‘)) #md5只能對字節形式的數據進行加密 return obj.hexdigest() ctime = int(time.time()*1000) #*1000單位變ms,int 轉化為整數(去掉小數點後的數字,而不是四舍五入) result = requests.post( url = ‘http://127.0.0.1:8000/api/test/‘, #註意路徑結尾一定更要以/結尾 params = {‘sign‘:gen_sign(ctime),‘ctime‘:ctime} #相當於在url = ‘http://.../?key="alskdjflskdfjkjk"‘,也就是在url中以get形式發送數據 ) print(result.url,result.text) #認證邏輯:agent 把簽名密鑰與生成簽名密鑰的時間一並發送到server端,server 端通過自身保存的key與agent 的時間生成簽名值,兩個簽名比對進行認證
server 端(view.py)
import json import hashlib #此模塊裏有md5的類 import time from django.shortcuts import render,HttpResponse from django.views import View from django.conf import settings from rest_framework.views import APIView from rest_framework.response import Response #生成簽名的函數 def gen_sign(ctime): """ 生成簽名 :param ctime: :return: """ val = ‘%s|%s‘ %(settings.URL_AUTH_KEY,ctime,) obj = hashlib.md5() #實力化一個md5對象 obj.update(val.encode(‘utf-8‘)) #md5對象只能對bytes格式數據加密 return obj.hexdigest() #返回生成的簽名值 #所有的驗證成功的簽名密鑰都會記錄在SIGN_RECORD 中 SIGN_RECORD = {} class TestView(APIView): def post(self,request): print("請求來了") #由於繼承了APIView所以此處的request 不是django原生的request,原生的為request._request client_sign = request._request.GET.get(‘sign‘) #獲取agent發來的簽名密鑰 client_ctime = int(request._request.GET.get(‘ctime‘)) #獲取agent 發來的時間 server_time = int(time.time() * 1000) #記錄當前時間,*1000單位變成ms #認證第一關 if server_time - client_ctime > 5000: #比對兩個時間,超過5s,則認證失敗 return Response({‘status‘: ‘false‘, ‘error‘: "路上時間太久了"}) # 認證第二關 if client_sign in SIGN_RECORD: #已經認證通過的簽名會保存在SING_RECORD,新來密鑰如果被認證過了就會認證失敗 return Response("簽名已經被使用過了") #認證第三關 server_sign = gen_sign(client_ctime) #server 端生成簽名 if client_sign != server_sign: #密鑰比對來通過認證 return Response({‘status‘:‘false‘,‘error‘:403}) #認證通過進行以下操作 SIGN_RECORD[client_sign] = client_ctime #通過認證的密鑰存儲起來,下次防止密鑰重復使用 #防止SIGN_RECORD無線增大,對於5s 之前的記錄刪除。對於已經認證通過並存儲在該記錄中的密鑰,超過10s刪除後, ## 那麽認證第二關就失效了,所以認證第一關就是為此而設 for k in list(SIGN_RECORD.keys()): v = SIGN_RECORD[k] if server_time - v > 5000: print("已經超過5s") del SIGN_RECORD[k] return Response({‘status‘:‘true‘,‘data‘:666})View Code
數據加密
加密原理
通過rsa 模塊生成公鑰和私鑰分別保存在agent 與server 端,agent 通過公鑰加密數據傳輸到server 端通過私鑰解密。rsa 加密分為1024與2048兩種,生成公私鑰對象時指定參數即可 pub_key_obj, priv_key_obj = rsa.newkeys(1024) ,1024指的是能加密的數據的位數,換算成字節就是1024/8=128 bytes
代碼
生成公私鑰,分別保存在server/agent 兩端
# ######### 1. 生成公鑰私鑰 ######### pub_key_obj, priv_key_obj = rsa.newkeys(1024) # 1024/8 = 128 ,128 - 11 = 117 # 公鑰字符串 pub_key_str = pub_key_obj.save_pkcs1() pub_key_code = base64.standard_b64encode(pub_key_str) # 私鑰字符串 priv_key_str = priv_key_obj.save_pkcs1() priv_key_code = base64.standard_b64encode(priv_key_str) 將生成的公私鑰(經過base64.standard_b64encode處理的)分別保存在agent/server 配置文件settings.py中 原生的公鑰: b‘-----BEGIN RSA PUBLIC KEY-----\nMIGJAoGBAJo2DEaukeIBTvc5vscIrh0gU79N+XRrf6NBGxGi6eOh7muzH3VV7UIn\nZvfUE3Nxu97DiMAC1u2JEudM8iatMChSLxSh9qFNB36ejz7dCi9DrAH6Ce46JZ7h\n+iwlo9x7Qr4uLJrQsHhia4/i89aAooNV6I8Ne+WOe3V1PvEUs+PhAgMBAAE=\n-----END RSA PUBLIC KEY-----\n‘ base64.standard_b64encode編碼處理後: b‘LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JR0pBb0dCQUpvMkRFYXVrZUlCVHZjNXZzY0lyaDBnVTc5TitYUnJmNk5CR3hHaTZlT2g3bXV6SDNWVjdVSW4KWnZmVUUzTnh1OTdEaU1BQzF1MkpFdWRNOGlhdE1DaFNMeFNoOXFGTkIzNmVqejdkQ2k5RHJBSDZDZTQ2Slo3aAoraXdsbzl4N1FyNHVMSnJRc0hoaWE0L2k4OWFBb29OVjZJOE5lK1dPZTNWMVB2RVVzK1BoQWdNQkFBRT0KLS0tLS1FTkQgUlNBIFBVQkxJQyBLRVktLS0tLQo=‘View Code
agent 端
import rsa import base64 from config import settings #數據加密函數 def encrypt(value_bytes): """ rsa 公鑰加密 :param value_bytes: 要加密的字節 :return: """ key_str = base64.standard_b64decode(settings.PUB_KEY) #settings.py 中記錄了公鑰 pk = rsa.PublicKey.load_pkcs1(key_str) #rsa 1024 能加密的數據大小為128(1024/8)字節,rsa 本身數據占用11字節,所以對大於128字節的數據分批次加密(每次加密117(128-11)字節)最後加密的數據拼接即可。 data_list = [] for i in range(0,len(value_bytes),117): chunk = value_bytes[i:i+117] result = rsa.encrypt(chunk, pk) data_list.append(result) return b‘‘.join(data_list) #進行數據加密,並且發送到agent ctime = int(time.time()*1000) r1 = requests.post( url=self.asset_api, params = {‘sign‘:gen_sign(ctime),‘ctime‘:ctime}, #傳輸簽名密鑰 data=encrypt(json.dumps(info).encode(‘utf-8‘)), #傳輸數據rsa 加密 headers={‘Content-Type‘:‘application/json‘} )View Code
server 端
#使用私鑰解密的函數 import rsa import base64 def decrypt(bytes_value): """ rsa解密 :param bytes_value: 要解密的數據為bytes 類型,因為rsa 只能對bytes 類型數據加密,所以解密數據為bytes類型 :return: 解密完成的字節 """ key_str = base64.standard_b64decode(settings.PRIV_KEY) #生成密鑰後使用base64.standard_b64encode對公鑰進行了編碼處理,所以此處要用decode 解碼 pk = rsa.PrivateKey.load_pkcs1(key_str) #生成私鑰證書 #rsa 1024 能加密的數據大小為128(1024/8)字節,所以需要對大於128字節的數據分批次加密(每次加密128字節)最後加密的數據拼接即可。所以解密時也是每次解密128字節,最後拼接即可 result = [] for i in range(0,len(bytes_value),128): chunk = bytes_value[i:i+128] val = rsa.decrypt(chunk, pk) #通過公鑰證書對數據chunk 加密 result.append(val) return b‘‘.join(result) #利用私鑰解密的函數解密數據 class AssetView(APIAuthView): def post(self, request, *args, **kwargs): body = decrypt(request._request.body) #通過decrypt 解密數據,body 中才是原生的數據 asset_info = json.loads(body.decode(‘utf-8‘)) #解密後的數據解碼再序列化加載View Code
API 接口認證與傳輸數據加密