Python網路質量測試工具增加亂序統計
阿新 • • 發佈:2018-11-12
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow
也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
半月月前,我用Python寫了一個工具,可以測試網路的純丟包率以及探測網路路徑中的佇列情況,經過一些使用者的反饋,還算比較好用,關於這個工具,請參見《 動手寫一個探測網路質量(丟包率/RTT/隊形等)的工具但是我覺得這個少了關於亂序度的測試功能,於是補充之。其實,在Linux的TC工具上,除了佇列,丟包率,延遲之外,亂序度也是一個非常重要的配置引數,不過請記住,Linux不是全部,對於程式設計師而言,除了抓包之外,瞭解一點Linux之外的東西,比如Cisco,運營商之類的,還是必要的。我至今依然記得網咖的網管搞定了一個東軟高階專家搞不定的問題的場景,這很正常,因為雖然網咖的網管可能沒什麼基礎知識,程式設計師同樣也沒有,網管的優勢在於,人家善於使用工具。兩個領域,方法論自然不同。
事實上關於亂序的測試統計非常簡單,這基於一個預期,就是“應答序列號一定大於等於傳送序列號”,一旦不滿足這個預期,說明有資料包亂序,在這個程式中,我主要統計兩個變數, 第一是總的亂序度數量,另一個是總的亂序次數。
統計邏輯如下:
變數: 1).當前應答的最高序列號hseq;2).當前應答的序列號seq;3).亂序度reorder;4).亂序次數reorder_cnt。
1>.如果seq大於等於hseq,則更新hseq為seq;
2>.否則計算delta=hseq-seq,redorder+=delta,reorder+=1。
這樣在程式的最後會統計出整個網路的亂序情況。詳細情況請看程式碼:
#!/usr/local/bin/pythonimport sysimport timefrom time import sleep,ctimeimport signalimport threadingfrom scapy.all import *target = sys.argv[1]tot = int(sys.argv[2])tot_per = int(sys.argv[3])vl = int(sys.argv[4])flt = "host " + target + " and icmp"handle = open("/dev/null", 'w')out_list = []in_list = []global posglobal currglobal tot_reorderglobal reorderpos = 0curr = 0tot_reorder = 0reorder = 0def output(): global pos global curr global tot_reorder global reorder all = out_list + in_list all.sort(lambda x,y:cmp(x[3],y[3])) for item in all: print item[0], item[1], item[2], item[3]*10 sys.stdout.flush() handle.write("\nReorder:" + str(reorder) + " Reorder cnt:" + str(tot_reorder) + "\n") os._exit(0)def signal_handler(signal, frame): handle.write("\nExit:" + ctime() + '\n') output()class ThreadWraper(threading.Thread): def __init__(self,func,args,name=''): threading.Thread.__init__(self) self.name=name self.func=func self.args=args def run(self): apply(self.func,self.args)def printrecv(pktdata): global pos global curr global tot_reorder global reorder if ICMP in pktdata and pktdata[ICMP]: seq = str(pktdata[ICMP].seq) if seq == tot_per + 2: return if str(pktdata[IP].dst) == target: handle.write('*') handle.flush() out_list.append(('+', 1, seq, time.clock())) else: if vl == 2: handle.write('.') else: handle.write('\b \b') handle.flush() in_list.append(('-', 0, seq, time.clock())) curr = int(seq) if curr >= pos: pos = curr else: delta = pos - curr tot_reorder += 1 reorder += delta def checkstop(pktdata): if ICMP in pktdata and pktdata[ICMP]: seq = str(pktdata[ICMP].seq) if int(seq) == tot_per + 2 and str(pktdata[IP].src) == target: handle.write("\nExit:" + ctime() + '\n') output() return True return False def send_packet(): times = 0 global pos global curr while times < tot: times += 1 send(IP(dst = target)/ICMP(seq = (0, tot_per))/"test", verbose = 0, loop = 1, count = 1) pos = 0 curr = 0 #out_list.append(('++++++++', 1, -1, str(time.clock()))) send(IP(dst = target)/ICMP(seq = tot_per+2)/"bye", verbose = 0)def recv_packet(): sniff(prn = printrecv, store = 1, filter = flt, stop_filter = checkstop)def startup(): handle.write("Start:" + ctime() + '\n') send_thread = ThreadWraper(send_packet,(),send_packet.__name__) send_thread.setDaemon(True) send_thread.start() recv_thread = ThreadWraper(recv_packet,(),recv_packet.__name__) recv_thread.setDaemon(True) recv_thread.start() signal.pause()if __name__ == '__main__': if vl != 0: handle.close() handle = sys.stderr signal.signal(signal.SIGINT, signal_handler) startup()
相關的更新已經更新到了 github