1. 程式人生 > >驗證客戶端的連結合法性

驗證客戶端的連結合法性

一、os.urandom(n)

    獲取一種bytes型別的隨機生成的n個位元組字串的方法,每次生成的值都不相同。

二、hmac

    hmac模組實現了標準的Hmac演算法,首先需要準備待計算的原始訊息key,和隨機生成的msg,雜湊演算法,採用MD5,使用hmac的程式碼如下:

import hmac

key = b"Hello World"
msg = b"secret"

h = hmac.new(key, msg, digestmod='MD5')
print(h.hexdigest())  # 8a315e3168dcd347b8bb3e2517da7b4d

注:傳入的key和msg必須為bytes型別。

三、驗證客戶端的連結合法性

    如果服務端想限制客戶端訪問,就必須對客戶端進行認證,認證通過方可訪問。否則客戶端只需知道服務端ip和埠號就能對其進行訪問。

    如果你想在分散式系統中實現一個簡單的客戶端連結認證功能,又不像SSL那麼複雜,那麼利用hmac + 加鹽的方式來實現

服務端:

import socket
import hmac
import os


secret_key = b"Jedan has a big key!"  #


def conn_auth(conn):
    
""" 認證客戶端連結 :param conn:管道 :return:True/False """ print("開始驗證新連結的合法") msg = os.urandom(32) # 生成一個32位元組的bytes型別隨機字串 conn.sendall(msg) # 傳送隨機字串 h = hmac.new(secret_key, msg) digest = h.digest() # 得到密文 response = conn.recv(len(digest)) # 接收客戶端密文 # hmac.compare_digest(密文,密文),比較客戶端密文與服務端密文是否相同,相同返回True,不同返回False
return hmac.compare_digest(response, digest) def data_handler(conn, bufsize=1024): """ 判斷連結是否合法 :param conn: 管道 :param bufsize: 接收資料長度 :return: None """ if not conn_auth(conn): print("該連結不合法") conn.close() return print("連結合法") while 1: data = conn.recv(bufsize) if not data: break conn.sendall(data.upper()) def server_handler(ip_port, bufsize, backlog=5): """ 只處理連結 :param ip_port:地址(ip+port) :param bufsize:接收資料長度 :param backlog:監聽數 :return:data_handler(conn, bufsize) """ tcp_socket_server = socket.socket() # 建立socket物件 tcp_socket_server.bind(ip_port) # 繫結埠 tcp_socket_server.listen(backlog) # 監聽 while 1: conn, addr = tcp_socket_server.accept() print("新連線[%s:%s]" % (addr[0], addr[1])) data_handler(conn, bufsize) if __name__ == '__main__': ip_port = ("127.0.0.1", 8001) bufsize = 1024 server_handler(ip_port, bufsize)
server端

客戶端:

import socket
import hmac


secret_key = b"Jedan has a big key!"  #


def conn_auth(conn):
    """
    驗證客戶端到伺服器端的連結
    :param conn:
    :return:
    """
    msg = conn.recv(32)
    h = hmac.new(secret_key, msg)
    digest = h.digest()
    conn.sendall(digest)  # 傳送客戶端密文


def client_handler(ip_port, bufsize=1024):
    """
    傳送密文驗證,驗證成功實現通訊
    :param ip_port:地址
    :param bufsize:資料接收長度
    :return:None
    """
    tcp_socket_client = socket.socket()
    tcp_socket_client.connect(ip_port)

    conn_auth(tcp_socket_client)

    while 1:
        data = input(">>>(q退出)").strip()
        if not data:
            continue
        if data.upper() == "Q":
            break

        tcp_socket_client.sendall(data.encode("utf-8"))
        response = tcp_socket_client.recv(bufsize)
        print(response.decode("utf-8"))

    tcp_socket_client.close()


if __name__ == '__main__':
    ip_port = ("127.0.0.1", 8001)
    bufsize = 1024
    client_handler(ip_port, bufsize)
client端