用Python做一個安全攻防工具:埠嗅探器(6)
阿新 • • 發佈:2021-01-30
技術標籤:PySide2python安全python多執行緒PySide2
傳送門
本系列原創博文傳送門:
本章目標
利用PySide2的訊號和槽,在點選了GUI上的“啟動”按鈕後:
1、從介面上獲得各項輸入的引數資料
2、對引數進行轉換和輸出
3、進行合理範圍判斷,對於異常的資料進行彈窗處理
步驟實施
首先引入 Slot 模組,學過vue的話知道這個叫 插槽 。
from PySide2.QtCore import Slot # 插槽模組
然後建立槽函式,用來接受點選了“啟動”按鈕後傳過來的資料:
@Slot()
def collect_data():
print('clicked')
其中 “@Slot”是裝飾器,表明這是一個槽函式。
接下來是建立連線:
start_btn = QPushButton() # 修改父類
start_btn.setText('啟動')
start_btn.clicked.connect(collect_data) # 建立連線
測試一下,執行介面,然後點一下啟動按鈕,看看有沒有輸出:
很好,成功了。
這樣就開始試著去呼叫各個控制元件去輸出資料:
@Slot() def collect_data(): print(ip_line_edit.text()) print(type(ip_line_edit.text())) print(thread_line_edit.text()) print(type(thread_line_edit.text())) print(port_line_edit1.text()) print(type(port_line_edit1.text())) print(port_line_edit2.text()) print(type(port_line_edit2.text()))
列印獲得的資料,和判斷一下型別:
127.0.0.1
<class 'str'>
500
<class 'str'>
0
<class 'str'>
65535
<class 'str'>
成功接收到資料,那接下來開始做資料清洗的動作:
MESSAGE = (
'ip或者網址不得為空',
'併發數不得小於1',
'開始埠號不得小於0',
'結束埠號需要大於開始埠號'
)
def show_tip(message): # 遇到問題,則丟到這裡,拋到介面上
tip = QMessageBox(window)
tip.setWindowTitle('提示')
tip.setText(message)
tip.show()
@Slot()
def collect_data():
ip = ip_line_edit.text()
thread_line = int(thread_line_edit.text())
port_start = int(port_line_edit1.text())
port_end = int(port_line_edit2.text())
if not ip:
show_tip(MESSAGE[0])
return
if thread_line < 1:
show_tip(MESSAGE[1])
return
if port_start < 0:
show_tip(MESSAGE[2])
return
if port_end <= port_start:
show_tip(MESSAGE[3])
return
# TODO
# 呼叫邏輯程式碼
這裡又引入了QMessageBox模組,用於錯誤資訊彈窗。
測試了下各種情況,符合預期,繼續下一步,封裝之前寫好的邏輯程式碼,方便呼叫:
PortSearch.py的程式碼
# coding=utf-8
import socket
import threading
class PortSearch:
def __init__(self, ip='', thread_line=500, port_start=0, port_end=65535):
self.global_list = [] # 收集開著的埠資訊
self.global_err_list = [] # 收集關著的埠資訊
self.global_thread_list = [] # 執行緒收集列表
self.lock = threading.Lock() # 建立一個鎖
self.sem = threading.Semaphore(thread_line) # 併發500
self.ip = ip
self.port_start = port_start
self.port_end = port_end
def port_search(self, _ip, _port):
address = (_ip, _port) # 地址必須是一個元祖,第一個是str,第二個是int
client = socket.socket() # 建立套接字
code = client.connect_ex(address)
self.lock.acquire()
try:
if code == 0: # 當埠開啟時,錯誤碼是0,其他錯誤碼均表示未開該埠
self.global_list.append([_ip, _port, code, client.recv(1024)])
else:
self.global_err_list.append([_ip, _port, code]) # 此處不能呼叫 recv api,會丟擲異常
except BaseException:
print('進行列表操作時報錯')
self.lock.release()
client.close() # 關閉連線
def run(self):
for i in range(self.port_start, self.port_end+1): # 0~65535
with self.sem:
thread = threading.Thread(target=self.port_search, args=(self.ip, i))
self.global_thread_list.append(thread)
thread.start()
for t in self.global_thread_list: # 遍歷,等待所有程序執行完畢
t.join()
return self.global_list
繼續修改PortSearchGUI.py的程式碼,引入邏輯程式碼:
from PortSearch import PortSearch
import json
在槽函式裡呼叫:
search = PortSearch(ip=ip, thread_line=thread_line, port_start=port_start, port_end=port_end)
result = search.run()
type(result)
print(result)
# string = json.dumps(result)
# report_box_edit.setPlainText(string)
結語
試執行,能出結果,還有一些問題待優化,比如點選按鈕之後工具會卡住,結果還不能輸出到大編輯框內。
這些問題都留到下個篇章處理。
今天的目標已經達到,而且超出。下個篇章主要做一些優化方面的事情,使得工具更好用。