1. 程式人生 > >Python UDP傳輸協議總結

Python UDP傳輸協議總結

本文索引

最近一段時間寫了一個涉及網路傳輸的專案,在這裡總結一下UDP傳輸協議,以及一個UDP協議輔助類。

總結經驗



1)udp傳輸包大小

報文大小:最大為1.4K

2)允許埠複用,否則使用使用過的埠需要等待一段時間

self.__sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

3)傳送報文速度

傳送報文速度上限與報文大小有一定關係,外網情況下1K的報文,服務端接收速度可以達到400個左右每秒,傳送過快反而會導致接收速度下降

4) 接收方緩衝區大小需比報文大小大,最好大個5-10倍,否則容易出現緩衝區溢位問題

5)udp協議是不可靠傳輸協議,不能保證到達另一端以及到達順序

程式碼實現



1)udp.py UDP協議輔助類

#UDP協議
#coding=UTF-8
import time
import threading
import sys
from socket import *
from md5 import *
from tmp import *

class UDP(object):
    __useTime = 0
    __heartbeat_time = 0
    __last_response_time = 0
    __tmp=None
    __data_pool = ""
    __data_list = []
    __inside_data_list = []
    __received_file_segment = []
    __inside_data_file_list = []
    __wait_send_segment = []
    __buffer=1024
    __sock=None
    __isServer = False
    __reader_thread = None

    def __init__(self,host,port,buffer,isServer=False):
        self.__buffer = buffer
        self.__isServer = isServer
        self.__heartbeat_time = 1

        address = (host,port)
        self.__sock = socket(AF_INET,SOCK_DGRAM)
        #允許埠複用,否則使用使用過的埠需要等待一段時間
        self.__sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

        if (isServer):
            try:
                self.__sock.bind(address)
            except Exception:
                self.disconnect()
        else:
            self.__sock.connect(address)

        #監聽執行緒
        self.__reader_thread = threading.Thread(target=UDP.readThread, args=(self,))
        self.__reader_thread.start()

        heartbeat_thread = threading.Thread(target=UDP.heartbeatThread, args=(self,))
        heartbeat_thread.start()

    #socket是否可用
    def isAlive(self):
        return (self.__sock!=None)

    #心跳執行緒,檢測執行緒狀態
    def heartbeatThread(self):
        self.__last_response_time = time.time()
        while((self.__heartbeat_time>0)and(self.isAlive())):
            if (time.time() - self.__last_response_time>self.__heartbeat_time):
                #心跳停止,死亡
                self.disconnect()
                return 
            
            time.sleepƐ.1)
        

    #設定緩衝區大小(單位位元組)
    #def setBuffer(self,buffer):
    #    self.__buffer = buffer

    #返回已接受的資料包ID
    def responsereceivedPacket(self,id,send_to):
        self.__write("receivedFileSegment",id,True,send_to)
        

    #儲存臨時檔案
    def saveTmpData(self,file_path):
        transfer_byte = 0

        while(self.__inside_data_file_list):
            data = self.__inside_data_file_list.popƐ)
            id = data["ID"]
            value = data["VALUE"]
            send_to = data["SENDER_ADDR"]
            length = len(value)

            #儲存臨時資料,如果未儲存的資料包
            if (not id in self.__received_file_segment):
                flag = self.__tmp.save(id,value)
                if (flag):
                    self.__received_file_segment.append(id)
                    transfer_byte+=length

            #傳送接收反饋報文
            self.responsereceivedPacket(id,send_to)

        return transfer_byte
    
    #處理內部協議
    def __dealInsideProtocolData(self,data):
        sender_addr = data["SenderAddr"]
        key,value = data["Data"].split(":::",1)
        #print "[receive]",key

        if (key.find("FileSegment_")>=0):
            #接收到檔案資料包
            key=key.split("FileSegment_",1)[1]
            self.__inside_data_file_list.append({"SENDER_ADDR":sender_addr,"ID":key,"VALUE":value})
        elif (key.find("receivedFileSegment")>=0):
            #接收到接收反饋報文
            value = int(value)
            if (value in self.__wait_send_segment):
                self.__wait_send_segment.remove(value)

        

    #讀取資料到資料列表
    def readThread(self):
        while(self.isAlive()):
            try:
                recv_data,sender_addr = self.__sock.recvfrom(self.__buffer)
            except Exception:
                #掉線
                self.disconnect()
                break
            
            self.__data_pool+=recv_data

            while(self.__data_pool.find(':::')>=0):
                length,protocol,data = self.__data_pool.split(':::',2)
                length = int(length)
                packet_data = data[:length]

                self.__last_response_time = time.time()
                
                if (protocol=="CustomProtocol"):
                    #使用者自定義報文
                    self.__data_list.append({"SenderAddr":sender_addr,"Data":packet_data})
                else:
                    #內部協議報文
                    self.__dealInsideProtocolData({"SenderAddr":sender_addr,"Data":packet_data})

                self.__data_pool = data[length:]
            
            #休眠0.01秒
            time.sleepƐ.001)

    #讀取資料列表
    def read(self):
        if (self.__data_list):
            return self.__data_list.popƐ)
        else:
            return None

    #列印傳輸資訊
    def printTransProgress(self,had_transfer_size,file_size,start_time):
        
        needWrap=False

        if (had_transfer_size<0):
            had_transfer_size = 0
        elif (had_transfer_size>=file_size):
            needWrap = True
            had_transfer_size = file_size

        used_time = time.time() - start_time
        speed = had_transfer_size/(used_time+1)

        #答應傳輸進度
        st = str(had_transfer_size)+"/"+str(file_size)+"byte|"+str(speed)+"byte/s "

        if (needWrap):
            st+="[Completed]  \n"
        else:
            st+="[Transfering]\r"

        sys.stdout.write(st)
        sys.stdout.flush()

    #讀取檔案
    def readFile(self,file_path,file_size):
        self.__tmp = TMP("./tmp/"+MD5(file_path)+"/")
        
        had_transfer_size = 0
        self.__received_file_segment = []

        start_time = time.time()

        #print file_size
        while ((had_transfer_size<file_size) and (self.isAlive())):
            transfer_byte = self.saveTmpData(file_path)
            had_transfer_size+=transfer_byte

            #列印傳輸資訊
            if (transfer_byte>0):
                self.printTransProgress(had_transfer_size,file_size,start_time)
        
        if (had_transfer_size==file_size):
            #傳輸成功,組合資料
            self.__last_response_time = time.time()+30
            return self.__tmp.merge(file_path,file_size)
        else:
            #傳輸失敗
            self.__tmp.clean()
            return False

    #寫資料
    def __write(self,key,value,isInsideProtocol=False,send_to=None):
        if (not self.isAlive()):
            return 

        if (isInsideProtocol):
            key="InsideProtocol:::"+key
        else:
            key="CustomProtocol:::"+key

        #print "[send]",key

        data = key + ":::" + value
        data = str(len(data))+":::"+data

        if (not self.__isServer):
            return self.__sock.send(data)
        else:
            if (send_to):
                return self.__sock.sendto(data,send_to)

    def write(self,key,value):
        self.__write(key,value)

    #寫檔案
    def writeFile(self,file_path):
        file = File(file_path,"rb")
        file_size = file.getSize()

        start_time = time.time()

        if (file_size>0):
            self.__wait_send_segment = range(0,(file_size/self.__buffer)+1)

        while ((self.__wait_send_segment)and(self.isAlive())):
            send_segment = self.__wait_send_segment
            for segment in send_segment:
                #獲取檔案內容
                data = file.read(segment*self.__buffer,self.__buffer)
                #傳送資料包序號和資料
                self.__write("FileSegment_"+str(segment),data,True)
                time.sleepƐ.01)

            time.sleepƐ.01)

            #列印傳輸資訊
            had_transfer_size = file_size - len(self.__wait_send_segment) * self.__buffer
            self.printTransProgress(had_transfer_size,file_size,start_time)

    def disconnect(self):
        if (self.isAlive()):
            print "[Tip]Disconnect with remote"
            self.close()
            
    #關閉UDP通道
    def close(self):
        if (self.isAlive()):
            #self.__sock.shutdown()
            self.__sock.close()
            self.__sock=None


2)md5.py MD5加密計算輔助函式庫

#coding=utf-8
import hashlib
import os

#MD5輔助函式類
def MD5(value,isFile = False):
    if (isFile):
        return getFileMD5(value)
    else:
        return getStringMD5(value)

#計算字串的MD5碼
def getStringMD5(string):
    myMd5 = hashlib.md5()
    myMd5.update(string)
    myMd5_Digest = myMd5.hexdigest()
    return myMd5_Digest

#計算檔案的MD5碼
def getFileMD5(file_path):
    f = open(file_path,'rb')  
    md5_obj = hashlib.md5()
    while True:
      d = f.read

3)tmp.py 臨時檔案輔助函式庫

#coding=UTF-8
import os
from file import *

#臨時檔案輔助類
class TMP():
    __tmp_path = ""

    def __init__(self,tmp_path):
        self.__tmp_path = tmp_path
        self.clean()

    #整合臨時檔案
    def merge(self,file_save_path,file_size):
        #print self.__tmp_path
        fout = File(file_save_path,"wb")

        if (file_size>0):
            tmp_count = len(os.listdir(self.__tmp_path))

            for index in range(0,tmp_count):
                #print "merge tmp",index
                tmp_file_name = self.__tmp_path+"/"+str(index)+".tmp"
                if (os.path.exists(tmp_file_name)):
                    fin = File(tmp_file_name,"rb")
                    fout.write(fin.read())
                    fin.close()
                else:
                    return False

            self.clean()

        fout.close()
        return True

    #清空臨時檔案
    def clean(self):
        if (os.path.exists(self.__tmp_path)):
            for tmp_file in os.listdir(self.__tmp_path):
                #刪除臨時檔案
                tmp_file_name = self.__tmp_path+"/"+tmp_file
                os.remove(tmp_file_name)
            
            #刪除臨時檔案目錄
            os.rmdir(self.__tmp_path)

    #儲存臨時檔案
    def save(self,id,data):
        tmp_path = self.__tmp_path + "/" + id + ".tmp"
        #print "save tmp",tmp_path
        tmp_file = File(tmp_path,"wb")
        tmp_file.write(data)
        tmp_file.close()

        return os.path.exists(tmp_path)