1. 程式人生 > 其它 >用Python做一個安全攻防工具:埠嗅探器(6)

用Python做一個安全攻防工具:埠嗅探器(6)

技術標籤:PySide2python安全python多執行緒PySide2

傳送門

本系列原創博文傳送門:

用Python做一個安全攻防工具:埠嗅探器(1)

用Python做一個安全攻防工具:埠嗅探器(2)

用Python做一個安全攻防工具:埠嗅探器(3)

用Python做一個安全攻防工具:埠嗅探器(4)

用Python做一個安全攻防工具:埠嗅探器(5)

用Python做一個安全攻防工具:埠嗅探器(6)

本章目標

利用PySide2的訊號和槽,在點選了GUI上的“啟動”按鈕後:

1、從介面上獲得各項輸入的引數資料

2、對引數進行轉換和輸出

3、進行合理範圍判斷,對於異常的資料進行彈窗處理

步驟實施

首先引入 Slot 模組,學過vue的話知道這個叫 插槽 。

from PySide2.QtCore import Slot  # 插槽模組

然後建立槽函式,用來接受點選了“啟動”按鈕後傳過來的資料:

@Slot()
def collect_data():
    print('clicked')

其中 “@Slot”是裝飾器,表明這是一個槽函式。

接下來是建立連線:

start_btn = QPushButton()  # 修改父類
start_btn.setText('啟動')
start_btn.clicked.connect(collect_data)  # 建立連線

測試一下,執行介面,然後點一下啟動按鈕,看看有沒有輸出:

很好,成功了。

這樣就開始試著去呼叫各個控制元件去輸出資料:

@Slot()
def collect_data():
    print(ip_line_edit.text())
    print(type(ip_line_edit.text()))
    print(thread_line_edit.text())
    print(type(thread_line_edit.text()))
    print(port_line_edit1.text())
    print(type(port_line_edit1.text()))
    print(port_line_edit2.text())
    print(type(port_line_edit2.text()))

列印獲得的資料,和判斷一下型別:

127.0.0.1
<class 'str'>
500
<class 'str'>
0
<class 'str'>
65535
<class 'str'>

成功接收到資料,那接下來開始做資料清洗的動作:

MESSAGE = (
    'ip或者網址不得為空',
    '併發數不得小於1',
    '開始埠號不得小於0',
    '結束埠號需要大於開始埠號'
)


def show_tip(message):  # 遇到問題,則丟到這裡,拋到介面上
    tip = QMessageBox(window)
    tip.setWindowTitle('提示')
    tip.setText(message)
    tip.show()


@Slot()
def collect_data():
    ip = ip_line_edit.text()
    thread_line = int(thread_line_edit.text())
    port_start = int(port_line_edit1.text())
    port_end = int(port_line_edit2.text())
    if not ip:
        show_tip(MESSAGE[0])
        return
    if thread_line < 1:
        show_tip(MESSAGE[1])
        return
    if port_start < 0:
        show_tip(MESSAGE[2])
        return
    if port_end <= port_start:
        show_tip(MESSAGE[3])
        return
    # TODO
    # 呼叫邏輯程式碼

這裡又引入了QMessageBox模組,用於錯誤資訊彈窗。

測試了下各種情況,符合預期,繼續下一步,封裝之前寫好的邏輯程式碼,方便呼叫:

PortSearch.py的程式碼

# coding=utf-8
import socket
import threading


class PortSearch:
    def __init__(self, ip='', thread_line=500, port_start=0, port_end=65535):
        self.global_list = []  # 收集開著的埠資訊
        self.global_err_list = []  # 收集關著的埠資訊
        self.global_thread_list = []  # 執行緒收集列表
        self.lock = threading.Lock()  # 建立一個鎖
        self.sem = threading.Semaphore(thread_line)  # 併發500
        self.ip = ip
        self.port_start = port_start
        self.port_end = port_end

    def port_search(self, _ip, _port):
        address = (_ip, _port)  # 地址必須是一個元祖,第一個是str,第二個是int
        client = socket.socket()  # 建立套接字
        code = client.connect_ex(address)
        self.lock.acquire()
        try:
            if code == 0:  # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
                self.global_list.append([_ip, _port, code, client.recv(1024)])
            else:
                self.global_err_list.append([_ip, _port, code])  # 此處不能呼叫 recv api,會丟擲異常
        except BaseException:
            print('進行列表操作時報錯')
        self.lock.release()
        client.close()  # 關閉連線

    def run(self):
        for i in range(self.port_start, self.port_end+1):  # 0~65535
            with self.sem:
                thread = threading.Thread(target=self.port_search, args=(self.ip, i))
                self.global_thread_list.append(thread)
                thread.start()

        for t in self.global_thread_list:  # 遍歷,等待所有程序執行完畢
            t.join()
        return self.global_list

繼續修改PortSearchGUI.py的程式碼,引入邏輯程式碼:

from PortSearch import PortSearch
import json  

在槽函式裡呼叫:

    search = PortSearch(ip=ip, thread_line=thread_line, port_start=port_start, port_end=port_end)
    result = search.run()
    type(result)
    print(result)
    # string = json.dumps(result)
    # report_box_edit.setPlainText(string)

結語

試執行,能出結果,還有一些問題待優化,比如點選按鈕之後工具會卡住,結果還不能輸出到大編輯框內。

這些問題都留到下個篇章處理。

今天的目標已經達到,而且超出。下個篇章主要做一些優化方面的事情,使得工具更好用。