1. 程式人生 > >HTTPS代理明文截獲之證書偽造(Python)

HTTPS代理明文截獲之證書偽造(Python)

為了防止網路中的明文資訊的在傳輸過程中被截獲,SSL/TSL為各種應用協議加密封裝提供了一個不錯的解決方案,作為一種公開的加密協議,採用非對稱加密的方式來傳輸金鑰無疑為提供了不錯的安全保障。由於現今的計算機還未達到在有限時間破解出私鑰(RSA),所以想成功的截獲明文資訊,只能通過中間人偽造證書這種方式了,但是SSl/TSL協議在設計時就考慮到這一點,採用了CA機構證書籤名的方式來防止這個問題,因此要想做到真正的做到證書偽造,除非你有可信任的CA機構的私鑰(或者應用本身設計存在缺陷),這樣可以為自己簽發證書,正常情況下我們是不可能弄到這些的。說了這麼多其實想說明SSL/TSL協議至少目前來說還是安全的,所以想截獲HTTPS的明文,你先得匯入事先自己製作好的CA根證書(怎麼製作根證書以及簽發證書等等,可以檢視下Openssl的資料),廢話不多說具體看程式碼,只是通過一個簡單http代理來體現,程式碼比較簡陋,作用只是實現HTTPS的證書偽造的具體過程。

import os
import socket  
import threading  
import re  
import time  
import random
import ssl  
from M2Crypto import X509, EVP, RSA, ASN1  


CACerFile='ca.cer'  
  
CAKeyFile='ca.key'  

StoreFolder='certs/'

mutex = threading.Lock()

def gen_rand_serial(len):
    num=''
    nlist= random.sample(['1','2','3','4','5','6','7','9','0','a', 'b', 'c', 'd','e','f'], len)
    for n in nlist :
        num+=str(n) 
	return int(num.encode('hex'),16)
	
def mk_cert():  
    
    serial = gen_rand_serial(4)
    cert = X509.X509()   
    cert.set_serial_number(serial)  
    cert.set_version(2)  
    mk_cert_valid(cert)  
    cert.add_ext(X509.new_extension('nsComment', 'SSL sever'))  
    return cert  
  
def mk_cert_valid(cert, days=180):  
  
    t = long(time.time())  
    now = ASN1.ASN1_UTCTIME()  
    now.set_time(t - 24*60*60)  
    expire = ASN1.ASN1_UTCTIME()  
    expire.set_time(t + days * 24 * 60 * 60)  
    cert.set_not_before(now)  
    cert.set_not_after(expire)  
  
  
def mk_request(bits, cn='localhost'):  
  
    pk = EVP.PKey()  
    x = X509.Request()  
    rsa = RSA.gen_key(bits, 65537, lambda: None)  
    pk.assign_rsa(rsa)  
    x.set_pubkey(pk)  
    name = x.get_subject()  
    name.C = "CN"  
    name.CN = cn  
    name.ST = 'TS'  
    name.O = 'TS'  
    name.OU = 'TS'  
    x.sign(pk,'sha1')  
    return x, pk  
  
def mk_self_cert(cacert_file, ca_key_file, cn):  
  
    cert_req, pk2 = mk_request(2048, cn=cn)  
      
    if cacert_file and ca_key_file:  
        cacert = X509.load_cert(cacert_file)  
        pk1 = EVP.load_key(ca_key_file)  
    else:  
        cacert = None  
        pk1 = None  
      
    cert = mk_cert()  
    cert.set_subject(cert_req.get_subject())  
    cert.set_pubkey(cert_req.get_pubkey())  
      
    if cacert and pk1:  
        cert.set_issuer(cacert.get_issuer())  
        cert.sign(pk1, 'sha256')  
    else:  
        cert.set_issuer(cert.get_subject())  
        cert.sign(pk2, 'sha256')  
          
    with open(StoreFolder+cn+'.cer', 'w') as f:  
        f.write(cert.as_pem())  
    with open(StoreFolder+cn+'.key', 'w') as f:  
        f.write(pk2.as_pem(None))  
  
  
def RecviceMessage(ss):  
    head=''  
    method=''  
    isFrist=True  
    while (True):  
        try:  
            buf= ss.recv(2048)        
            if(len(buf)>0):   
                head+=buf  
            else:  
                break  
            if isFrist:  
                i=head.find(' ')  
                method=head[0:i]  
                isFrist=False  
        except Exception,e:    
                print "Recvice Browser Data Fail"  
                break  
                            
        if("\r\n\r\n" in head):   
            patten=method+'+( http| https)(://)+([^/])+(/)'              
            reobj = re.compile(patten)    
            result, number = reobj.subn(method+' /', head)  
            req=result.replace("Proxy-Connection:","Connection:")  
            print req  
  
            if method=="CONNECT":  
				#t = threading.Thread(target=FakeHttps,args=(result,ss))  
				#t.start()  
                FakeHttps(result,ss)  
            else:  
                #t = threading.Thread(target=ForWardHttp,args=(result,ss))  
                #t.start()
                ForWardHttp(result,ss)
                  
def ForWardHttp(msg,ss):  
    host=''  
    port=80  
    patten2=r'(Host: )+(\S+)'  
    searchObj2 = re.search( patten2, msg, re.M|re.I)  
    if searchObj2:    
         host=searchObj2.group(2)  
         #print host  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   
    if(isinstance(ss,ssl.SSLSocket)):   
       sock=ssl.wrap_socket(sock)  
       port=443  
          
    try:  
        ip=socket.gethostbyname(host)   
        sock.connect((ip, port))  
    except Exception,e:  
        print e,"Connect Fail"  
        return   
          
    sock.send(msg)  
    while (True):  
        try:  
            rec=sock.recv(2048)  
            if(len(rec)>0):  
                ss.send(rec)  
            else:  
                break  
                 
        except Exception,e:  
            print e,'Recvice Data Fail'  
            sock.close()  
            ss.close()  
            break  
  
def FakeHttps(result,ss):  
    index=result.find(':')  
      
    Host=result[len('CONNECT '):index]  
      
    ss.send('HTTP/1.1 200 Connection Established\r\n\r\n')  
    
    mutex.acquire()
    mk_self_cert(CACerFile,CAKeyFile,Host)                    #Make a self signed certificate  
    mutex.release()
    #os.system('FakeSSL.exe '+Host)
    conn = ssl.wrap_socket(ss,keyfile=StoreFolder+Host+".key",certfile=StoreFolder+Host+".cer",server_side=True)  
  
    RecviceMessage(conn)  
  
  
if __name__ == "__main__":  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    sock.bind(('0.0.0.0',8080))  
    sock.listen(50)
    while(True):  
        clientSock, address = sock.accept()   
        t = threading.Thread(target=RecviceMessage,args=(clientSock,))  
        t.start()  

設定代理,開啟google,發現網站證書已經被成功替換

明文請求資訊