Python UDP傳輸協議總結
阿新 • • 發佈:2019-01-24
本文索引
最近一段時間寫了一個涉及網路傳輸的專案,在這裡總結一下UDP傳輸協議,以及一個UDP協議輔助類。
總結經驗
1)udp傳輸包大小
報文大小:最大為1.4K
2)允許埠複用,否則使用使用過的埠需要等待一段時間
self.__sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
3)傳送報文速度
傳送報文速度上限與報文大小有一定關係,外網情況下1K的報文,服務端接收速度可以達到400個左右每秒,傳送過快反而會導致接收速度下降
4) 接收方緩衝區大小需比報文大小大,最好大個5-10倍,否則容易出現緩衝區溢位問題
5)udp協議是不可靠傳輸協議,不能保證到達另一端以及到達順序
程式碼實現
1)udp.py UDP協議輔助類
#UDP協議 #coding=UTF-8 import time import threading import sys from socket import * from md5 import * from tmp import * class UDP(object): __useTime = 0 __heartbeat_time = 0 __last_response_time = 0 __tmp=None __data_pool = "" __data_list = [] __inside_data_list = [] __received_file_segment = [] __inside_data_file_list = [] __wait_send_segment = [] __buffer=1024 __sock=None __isServer = False __reader_thread = None def __init__(self,host,port,buffer,isServer=False): self.__buffer = buffer self.__isServer = isServer self.__heartbeat_time = 1 address = (host,port) self.__sock = socket(AF_INET,SOCK_DGRAM) #允許埠複用,否則使用使用過的埠需要等待一段時間 self.__sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) if (isServer): try: self.__sock.bind(address) except Exception: self.disconnect() else: self.__sock.connect(address) #監聽執行緒 self.__reader_thread = threading.Thread(target=UDP.readThread, args=(self,)) self.__reader_thread.start() heartbeat_thread = threading.Thread(target=UDP.heartbeatThread, args=(self,)) heartbeat_thread.start() #socket是否可用 def isAlive(self): return (self.__sock!=None) #心跳執行緒,檢測執行緒狀態 def heartbeatThread(self): self.__last_response_time = time.time() while((self.__heartbeat_time>0)and(self.isAlive())): if (time.time() - self.__last_response_time>self.__heartbeat_time): #心跳停止,死亡 self.disconnect() return time.sleepƐ.1) #設定緩衝區大小(單位位元組) #def setBuffer(self,buffer): # self.__buffer = buffer #返回已接受的資料包ID def responsereceivedPacket(self,id,send_to): self.__write("receivedFileSegment",id,True,send_to) #儲存臨時檔案 def saveTmpData(self,file_path): transfer_byte = 0 while(self.__inside_data_file_list): data = self.__inside_data_file_list.popƐ) id = data["ID"] value = data["VALUE"] send_to = data["SENDER_ADDR"] length = len(value) #儲存臨時資料,如果未儲存的資料包 if (not id in self.__received_file_segment): flag = self.__tmp.save(id,value) if (flag): self.__received_file_segment.append(id) transfer_byte+=length #傳送接收反饋報文 self.responsereceivedPacket(id,send_to) return transfer_byte #處理內部協議 def __dealInsideProtocolData(self,data): sender_addr = data["SenderAddr"] key,value = data["Data"].split(":::",1) #print "[receive]",key if (key.find("FileSegment_")>=0): #接收到檔案資料包 key=key.split("FileSegment_",1)[1] self.__inside_data_file_list.append({"SENDER_ADDR":sender_addr,"ID":key,"VALUE":value}) elif (key.find("receivedFileSegment")>=0): #接收到接收反饋報文 value = int(value) if (value in self.__wait_send_segment): self.__wait_send_segment.remove(value) #讀取資料到資料列表 def readThread(self): while(self.isAlive()): try: recv_data,sender_addr = self.__sock.recvfrom(self.__buffer) except Exception: #掉線 self.disconnect() break self.__data_pool+=recv_data while(self.__data_pool.find(':::')>=0): length,protocol,data = self.__data_pool.split(':::',2) length = int(length) packet_data = data[:length] self.__last_response_time = time.time() if (protocol=="CustomProtocol"): #使用者自定義報文 self.__data_list.append({"SenderAddr":sender_addr,"Data":packet_data}) else: #內部協議報文 self.__dealInsideProtocolData({"SenderAddr":sender_addr,"Data":packet_data}) self.__data_pool = data[length:] #休眠0.01秒 time.sleepƐ.001) #讀取資料列表 def read(self): if (self.__data_list): return self.__data_list.popƐ) else: return None #列印傳輸資訊 def printTransProgress(self,had_transfer_size,file_size,start_time): needWrap=False if (had_transfer_size<0): had_transfer_size = 0 elif (had_transfer_size>=file_size): needWrap = True had_transfer_size = file_size used_time = time.time() - start_time speed = had_transfer_size/(used_time+1) #答應傳輸進度 st = str(had_transfer_size)+"/"+str(file_size)+"byte|"+str(speed)+"byte/s " if (needWrap): st+="[Completed] \n" else: st+="[Transfering]\r" sys.stdout.write(st) sys.stdout.flush() #讀取檔案 def readFile(self,file_path,file_size): self.__tmp = TMP("./tmp/"+MD5(file_path)+"/") had_transfer_size = 0 self.__received_file_segment = [] start_time = time.time() #print file_size while ((had_transfer_size<file_size) and (self.isAlive())): transfer_byte = self.saveTmpData(file_path) had_transfer_size+=transfer_byte #列印傳輸資訊 if (transfer_byte>0): self.printTransProgress(had_transfer_size,file_size,start_time) if (had_transfer_size==file_size): #傳輸成功,組合資料 self.__last_response_time = time.time()+30 return self.__tmp.merge(file_path,file_size) else: #傳輸失敗 self.__tmp.clean() return False #寫資料 def __write(self,key,value,isInsideProtocol=False,send_to=None): if (not self.isAlive()): return if (isInsideProtocol): key="InsideProtocol:::"+key else: key="CustomProtocol:::"+key #print "[send]",key data = key + ":::" + value data = str(len(data))+":::"+data if (not self.__isServer): return self.__sock.send(data) else: if (send_to): return self.__sock.sendto(data,send_to) def write(self,key,value): self.__write(key,value) #寫檔案 def writeFile(self,file_path): file = File(file_path,"rb") file_size = file.getSize() start_time = time.time() if (file_size>0): self.__wait_send_segment = range(0,(file_size/self.__buffer)+1) while ((self.__wait_send_segment)and(self.isAlive())): send_segment = self.__wait_send_segment for segment in send_segment: #獲取檔案內容 data = file.read(segment*self.__buffer,self.__buffer) #傳送資料包序號和資料 self.__write("FileSegment_"+str(segment),data,True) time.sleepƐ.01) time.sleepƐ.01) #列印傳輸資訊 had_transfer_size = file_size - len(self.__wait_send_segment) * self.__buffer self.printTransProgress(had_transfer_size,file_size,start_time) def disconnect(self): if (self.isAlive()): print "[Tip]Disconnect with remote" self.close() #關閉UDP通道 def close(self): if (self.isAlive()): #self.__sock.shutdown() self.__sock.close() self.__sock=None
2)md5.py MD5加密計算輔助函式庫
#coding=utf-8 import hashlib import os #MD5輔助函式類 def MD5(value,isFile = False): if (isFile): return getFileMD5(value) else: return getStringMD5(value) #計算字串的MD5碼 def getStringMD5(string): myMd5 = hashlib.md5() myMd5.update(string) myMd5_Digest = myMd5.hexdigest() return myMd5_Digest #計算檔案的MD5碼 def getFileMD5(file_path): f = open(file_path,'rb') md5_obj = hashlib.md5() while True: d = f.read
3)tmp.py 臨時檔案輔助函式庫
#coding=UTF-8
import os
from file import *
#臨時檔案輔助類
class TMP():
__tmp_path = ""
def __init__(self,tmp_path):
self.__tmp_path = tmp_path
self.clean()
#整合臨時檔案
def merge(self,file_save_path,file_size):
#print self.__tmp_path
fout = File(file_save_path,"wb")
if (file_size>0):
tmp_count = len(os.listdir(self.__tmp_path))
for index in range(0,tmp_count):
#print "merge tmp",index
tmp_file_name = self.__tmp_path+"/"+str(index)+".tmp"
if (os.path.exists(tmp_file_name)):
fin = File(tmp_file_name,"rb")
fout.write(fin.read())
fin.close()
else:
return False
self.clean()
fout.close()
return True
#清空臨時檔案
def clean(self):
if (os.path.exists(self.__tmp_path)):
for tmp_file in os.listdir(self.__tmp_path):
#刪除臨時檔案
tmp_file_name = self.__tmp_path+"/"+tmp_file
os.remove(tmp_file_name)
#刪除臨時檔案目錄
os.rmdir(self.__tmp_path)
#儲存臨時檔案
def save(self,id,data):
tmp_path = self.__tmp_path + "/" + id + ".tmp"
#print "save tmp",tmp_path
tmp_file = File(tmp_path,"wb")
tmp_file.write(data)
tmp_file.close()
return os.path.exists(tmp_path)