1. 程式人生 > 其它 >用Python做一個安全攻防工具:埠嗅探器(2)

用Python做一個安全攻防工具:埠嗅探器(2)

技術標籤:安全滲透測試pythonpython多執行緒列表

這一篇主要實現埠的自動遍歷。

根據埠的百科資料,一個ip的可用埠數是 2^16 = 65536 。

那麼遍歷一個ip的埠,就需要呼叫那麼多次埠訪問。因為經常要呼叫埠訪問的程式碼塊,我們先把這部分程式碼提取成一個函式。

# coding=utf-8
import socket


def port_search(ip, port):
    address = (ip, port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    err = client.connect_ex(address)
    if err == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
        print(client.recv(1024))
    client.close()  # 關閉連線
    print(err)


port_search(ip='110.242.68.4', port=22)

考慮到埠數量眾多,我們看一下執行一次的時間,引入time模組:

# coding=utf-8
import socket
import time


def port_search(ip, port):
    address = (ip, port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    err = client.connect_ex(address)
    if err == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
        print(client.recv(1024))
    client.close()  # 關閉連線
    print(err)


start_time = time.time()
port_search(ip='110.242.68.4', port=22)
print('消耗時間為: ', time.time()-start_time)

試了下,不開放的埠耗時:

將port值修改為80:

發現每一個埠都耗時很久,假設按20秒計算,遍歷完所有埠耗時=65536*20秒……好像有點久!

為了加快遍歷速度,我們引入執行緒threading模組(教程見此),執行緒的意義就是併發,而非序列。

第一階段,加入埠遍歷後,對於port_search函式通過實現執行緒的建立和執行來呼叫。

# coding=utf-8
import socket
import threading


def port_search(_ip, _port):
    address = (_ip, _port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    err = client.connect_ex(address)
    if err == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
        print(client.recv(1024))
    client.close()  # 關閉連線
    print(err)


ip = '110.242.68.4'
for i in range(65536):  # 0~65535
    thread = threading.Thread(target=port_search, args=(ip, i))
    thread.start()

這個程式還有問題,若是執行完,有很多個輸出,輸出很雜亂,需要將資訊歸整,修改之後如下:

import socket
import threading


global_list = []  # 收集開著的埠資訊
global_err_list = []  # 收集關著的埠資訊


def port_search(_ip, _port):
    address = (_ip, _port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    code = client.connect_ex(address)
    if code == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
        global_list.append([_ip, _port, code, client.recv(1024)])
    else:
        global_err_list.append([_ip, _port, code])  # 此處不能呼叫 recv api,會丟擲異常
    client.close()  # 關閉連線


ip = '110.242.68.4'
for i in range(65536):  # 0~65535
    thread = threading.Thread(target=port_search, args=(ip, i))
    thread.start()

print(global_list)
print(global_err_list)

到這裡之後,還有兩個潛在的問題:

1、這兩個全域性列表被併發訪問,需要加一個鎖;

2、列印的時候,有些執行緒沒有結束,所以會錯過一些資訊,需要使用join阻塞來等待所有執行緒結束。

再次修改如下:

# coding=utf-8
import socket
import threading


global_list = []  # 收集開著的埠資訊
global_err_list = []  # 收集關著的埠資訊
global_thread_list = []  # 執行緒收集列表
lock = threading.Lock()  # 建立一個鎖


def port_search(_ip, _port):
    address = (_ip, _port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    code = client.connect_ex(address)
    lock.acquire()
    if code == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
        global_list.append([_ip, _port, code, client.recv(1024)])
    else:
        global_err_list.append([_ip, _port, code])  # 此處不能呼叫 recv api,會丟擲異常
    lock.release()
    client.close()  # 關閉連線


ip = '110.242.68.4'
for i in range(65536):  # 0~65535
    thread = threading.Thread(target=port_search, args=(ip, i))
    global_thread_list.append(thread)
    thread.start()

for t in global_thread_list:  # 遍歷,等待所有程序執行完畢
    t.join()

print(global_list)
print(global_err_list)

走讀程式碼時發現有個風險點,如果在加鎖和解鎖之前的程式碼段丟擲異常,一個執行緒崩潰,會導致鎖無法解開,修改如下:

# coding=utf-8
import socket
import threading


global_list = []  # 收集開著的埠資訊
global_err_list = []  # 收集關著的埠資訊
global_thread_list = []  # 執行緒收集列表
lock = threading.Lock()  # 建立一個鎖


def port_search(_ip, _port):
    address = (_ip, _port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    code = client.connect_ex(address)
    lock.acquire()
    try:
        if code == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
            global_list.append([_ip, _port, code, client.recv(1024)])
        else:
            global_err_list.append([_ip, _port, code])  # 此處不能呼叫 recv api,會丟擲異常
    except BaseException:
        print('進行列表操作時報錯')
    lock.release()
    client.close()  # 關閉連線


ip = '110.242.68.4'
for i in range(65536):  # 0~65535
    thread = threading.Thread(target=port_search, args=(ip, i))
    global_thread_list.append(thread)
    thread.start()

for t in global_thread_list:  # 遍歷,等待所有程序執行完畢
    t.join()

print(global_list)
print(global_err_list)

初步試運行了一下,卡的不行。加一個最大執行緒限制:

# coding=utf-8
import socket
import threading


global_list = []  # 收集開著的埠資訊
global_err_list = []  # 收集關著的埠資訊
global_thread_list = []  # 執行緒收集列表
lock = threading.Lock()  # 建立一個鎖
sem = threading.Semaphore(500)  # 併發500


def port_search(_ip, _port):
    address = (_ip, _port)  # 地址必須是一個元祖,第一個是str,第二個是int
    client = socket.socket()  # 建立套接字
    code = client.connect_ex(address)
    lock.acquire()  # 鎖定
    try:
        if code == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
            global_list.append([_ip, _port, code, client.recv(1024)])
        else:
            global_err_list.append([_ip, _port, code])  # 此處不能呼叫 recv api,會丟擲異常
    except BaseException:
        print('進行列表操作時報錯')
    lock.release()  # 釋放鎖
    client.close()  # 關閉連線


ip = '110.242.68.4'
for i in range(65536):  # 0~65535
    with sem:  # 併發限制
        thread = threading.Thread(target=port_search, args=(ip, i))
        global_thread_list.append(thread)
        thread.start()

for t in global_thread_list:  # 遍歷,等待所有程序執行完畢
    t.join()

print(global_list)
# print(global_err_list)

遍歷下來後,輸出結果為:

[['110.242.68.4', 80, 0, b''], ['110.242.68.4', 443, 0, b'']]

就是web服務http和https兩大協議使用的埠。

好了,這個核心邏輯應該就沒啥問題了。

但是這種使用者介面就太LOW了吧?所以後面我們將使用PySide2來製作工具介面。

下一篇將開始接觸PySide2。