用Python做一個安全攻防工具:埠嗅探器(2)
阿新 • • 發佈:2020-12-29
這一篇主要實現埠的自動遍歷。
根據埠的百科資料,一個ip的可用埠數是 2^16 = 65536 。
那麼遍歷一個ip的埠,就需要呼叫那麼多次埠訪問。因為經常要呼叫埠訪問的程式碼塊,我們先把這部分程式碼提取成一個函式。
# coding=utf-8 import socket def port_search(ip, port): address = (ip, port) # 地址必須是一個元祖,第一個是str,第二個是int client = socket.socket() # 建立套接字 err = client.connect_ex(address) if err == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠 print(client.recv(1024)) client.close() # 關閉連線 print(err) port_search(ip='110.242.68.4', port=22)
考慮到埠數量眾多,我們看一下執行一次的時間,引入time模組:
# coding=utf-8 import socket import time def port_search(ip, port): address = (ip, port) # 地址必須是一個元祖,第一個是str,第二個是int client = socket.socket() # 建立套接字 err = client.connect_ex(address) if err == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠 print(client.recv(1024)) client.close() # 關閉連線 print(err) start_time = time.time() port_search(ip='110.242.68.4', port=22) print('消耗時間為: ', time.time()-start_time)
試了下,不開放的埠耗時:
將port值修改為80:
發現每一個埠都耗時很久,假設按20秒計算,遍歷完所有埠耗時=65536*20秒……好像有點久!
為了加快遍歷速度,我們引入執行緒threading模組(教程見此),執行緒的意義就是併發,而非序列。
第一階段,加入埠遍歷後,對於port_search函式通過實現執行緒的建立和執行來呼叫。
# coding=utf-8 import socket import threading def port_search(_ip, _port): address = (_ip, _port) # 地址必須是一個元祖,第一個是str,第二個是int client = socket.socket() # 建立套接字 err = client.connect_ex(address) if err == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠 print(client.recv(1024)) client.close() # 關閉連線 print(err) ip = '110.242.68.4' for i in range(65536): # 0~65535 thread = threading.Thread(target=port_search, args=(ip, i)) thread.start()
這個程式還有問題,若是執行完,有很多個輸出,輸出很雜亂,需要將資訊歸整,修改之後如下:
import socket
import threading
global_list = [] # 收集開著的埠資訊
global_err_list = [] # 收集關著的埠資訊
def port_search(_ip, _port):
address = (_ip, _port) # 地址必須是一個元祖,第一個是str,第二個是int
client = socket.socket() # 建立套接字
code = client.connect_ex(address)
if code == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
global_list.append([_ip, _port, code, client.recv(1024)])
else:
global_err_list.append([_ip, _port, code]) # 此處不能呼叫 recv api,會丟擲異常
client.close() # 關閉連線
ip = '110.242.68.4'
for i in range(65536): # 0~65535
thread = threading.Thread(target=port_search, args=(ip, i))
thread.start()
print(global_list)
print(global_err_list)
到這裡之後,還有兩個潛在的問題:
1、這兩個全域性列表被併發訪問,需要加一個鎖;
2、列印的時候,有些執行緒沒有結束,所以會錯過一些資訊,需要使用join阻塞來等待所有執行緒結束。
再次修改如下:
# coding=utf-8
import socket
import threading
global_list = [] # 收集開著的埠資訊
global_err_list = [] # 收集關著的埠資訊
global_thread_list = [] # 執行緒收集列表
lock = threading.Lock() # 建立一個鎖
def port_search(_ip, _port):
address = (_ip, _port) # 地址必須是一個元祖,第一個是str,第二個是int
client = socket.socket() # 建立套接字
code = client.connect_ex(address)
lock.acquire()
if code == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
global_list.append([_ip, _port, code, client.recv(1024)])
else:
global_err_list.append([_ip, _port, code]) # 此處不能呼叫 recv api,會丟擲異常
lock.release()
client.close() # 關閉連線
ip = '110.242.68.4'
for i in range(65536): # 0~65535
thread = threading.Thread(target=port_search, args=(ip, i))
global_thread_list.append(thread)
thread.start()
for t in global_thread_list: # 遍歷,等待所有程序執行完畢
t.join()
print(global_list)
print(global_err_list)
走讀程式碼時發現有個風險點,如果在加鎖和解鎖之前的程式碼段丟擲異常,一個執行緒崩潰,會導致鎖無法解開,修改如下:
# coding=utf-8
import socket
import threading
global_list = [] # 收集開著的埠資訊
global_err_list = [] # 收集關著的埠資訊
global_thread_list = [] # 執行緒收集列表
lock = threading.Lock() # 建立一個鎖
def port_search(_ip, _port):
address = (_ip, _port) # 地址必須是一個元祖,第一個是str,第二個是int
client = socket.socket() # 建立套接字
code = client.connect_ex(address)
lock.acquire()
try:
if code == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
global_list.append([_ip, _port, code, client.recv(1024)])
else:
global_err_list.append([_ip, _port, code]) # 此處不能呼叫 recv api,會丟擲異常
except BaseException:
print('進行列表操作時報錯')
lock.release()
client.close() # 關閉連線
ip = '110.242.68.4'
for i in range(65536): # 0~65535
thread = threading.Thread(target=port_search, args=(ip, i))
global_thread_list.append(thread)
thread.start()
for t in global_thread_list: # 遍歷,等待所有程序執行完畢
t.join()
print(global_list)
print(global_err_list)
初步試運行了一下,卡的不行。加一個最大執行緒限制:
# coding=utf-8
import socket
import threading
global_list = [] # 收集開著的埠資訊
global_err_list = [] # 收集關著的埠資訊
global_thread_list = [] # 執行緒收集列表
lock = threading.Lock() # 建立一個鎖
sem = threading.Semaphore(500) # 併發500
def port_search(_ip, _port):
address = (_ip, _port) # 地址必須是一個元祖,第一個是str,第二個是int
client = socket.socket() # 建立套接字
code = client.connect_ex(address)
lock.acquire() # 鎖定
try:
if code == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
global_list.append([_ip, _port, code, client.recv(1024)])
else:
global_err_list.append([_ip, _port, code]) # 此處不能呼叫 recv api,會丟擲異常
except BaseException:
print('進行列表操作時報錯')
lock.release() # 釋放鎖
client.close() # 關閉連線
ip = '110.242.68.4'
for i in range(65536): # 0~65535
with sem: # 併發限制
thread = threading.Thread(target=port_search, args=(ip, i))
global_thread_list.append(thread)
thread.start()
for t in global_thread_list: # 遍歷,等待所有程序執行完畢
t.join()
print(global_list)
# print(global_err_list)
遍歷下來後,輸出結果為:
[['110.242.68.4', 80, 0, b''], ['110.242.68.4', 443, 0, b'']]
就是web服務http和https兩大協議使用的埠。
好了,這個核心邏輯應該就沒啥問題了。
但是這種使用者介面就太LOW了吧?所以後面我們將使用PySide2來製作工具介面。
下一篇將開始接觸PySide2。