1. 程式人生 > >遊戲伺服器之長連線伺服器(python)(2)

遊戲伺服器之長連線伺服器(python)(2)

遊戲伺服器之長連線伺服器實現tcp連線的資料非同步收發。

一個網路收發處理程序,一個伺服器物件邏輯處理程序。兩個程序之間使用管道通訊。

網路收發處理程序:

(1)網路處理是由反應器的子執行緒來處理的。

(2)從管道的一端讀取資料,讀取管道後和網路傳送前,需要前處理連包和反序列化來檢查包的完整性。這裡還缺少合適的連包斷包處理。

(3)反應器的子執行緒接受網路資料再寫到管道。

伺服器物件邏輯處理程序:

(1)讀取網路資料並反序列化的是在伺服器物件程序的一個接受迴圈執行緒裡處理。並派送到伺服器物件的訊息佇列。

(2)反應器的處理定時器事件(在主執行緒)。

(3)主執行緒序列化併發送到管道。

這樣的好處是邏輯服務程序只處理邏輯相關的事情,網路收發處理

程序只是處理網路資料收發(可以使用兩個程序的兩個反應器分開處理,一個程序只可以使用一個反應器),可以非同步處理

缺點是網路收發處理程序和伺服器物件邏輯處理程序都需要連包和反序列化來檢查資料的完整性,使用管道也會有消耗。

總的來說,使用管道傳送侷限於單一物理機器,序列化反序列化次數增加,這種方法沒有提升效率。需要在網路收發程序增加連包段寶處理需要拓展到適用於伺服器叢集。

1、網路收發程序伺服器物件

(1)網路收發程序伺服器物件定義

class BaseNetServer:
    u"""
    . 子程序,模擬網路模組
    . 傳送訊息過程:從執行緒從管道中接收訊息,
    .                檢視訊息目標服務
    .                根據目標服務查詢通訊頻道
    .                呼叫通訊頻道傳送訊息
    . 接收訊息:通訊頻道將訊息回撥給網路服務
    .            網路服務將訊息通過管道傳送給主服務
    """
    def __init__(self, recv_pipe, send_pipe, conf):
         log.info(u"netserver init")
        self.recv_pipe = recv_pipe
        self.send_pipe = send_pipe
        
        self.btime = time.time()
        self.etime = 0
        self.count = 0
        
        self.conf = conf
        self.name = conf.myself.name
        self.channels = {} # server_name: channel
        self.route = {} # service_id: ServiceItem
        for s in conf.servers:
            for svcId, svc in s.services.items():
                self.route[svcId] = ServiceItem(svcId, s.name)
                
        # 建立工作執行緒
        self.thread = Thread(target=self.run)
        self.thread.setDaemon(True)
        self.thread.start()
        
        # 主執行緒loop,啟動網路模組
        print "------------------------------"
        self.start()
    # -------------------------------------------------------------------------
    def requestCreateChannel(self, target, transport):
        u"""
        . 網路層在網路協議建立後,請求伺服器物件建立和對端Server的通訊頻道
        1)伺服器物件建立Channel物件
        2)伺服器物件為每個服務設定通訊Channel"""
        channel = Channel(transport)
        self.channels[target] = channel
        for k, v in self.route.items():
            if v.serverName == target:
                v.channel = channel
                log.info(u"service (%d) is in the server(%s)", k, target)
        log.info(u"[%s] : server(%s) is connected", self.name, target)
        channel.setListener(self)
        return channel

    def requestDestroyChannel(self, target):
        u"""
        .網路層在網路協議斷連後,請求伺服器物件銷燬和對端Server的通訊頻道
        1)伺服器物件銷燬Channel物件
        2)伺服器物件設定相應服務Channel=None,後續發給該服務的訊息將無法送達
        """
        del self.channels[target]
        for k, v in self.route.items():
            if v.serverName == target:
                v.channel = None
        log.info(u"server(%s) is disconnected", target)
    
    def getChannel(self, serviceId):
        item = self.route[serviceId]
        if item != None:
            return item.channel
        return None
        
    # -------------------------------------------------------------------------
    def start(self):#建立需求的工廠
        conf = self.conf
        log.info(u"Setup network configuration....")
        for i in range(0, len(conf.servers)):
            if conf.servers[i].name == self.name:
                reactor.listenTCP(conf.servers[i].port, 
                                  ServerConnectionFactory(self, self.name))
                break
            else:
                log.info(u"conf.servers[i].ip: %s, port: %s", \
                  conf.servers[i].ip, conf.servers[i].port)
                reactor.connectTCP(self.conf.servers[i].ip, conf.servers[i].port,
                  ClientConnectionFactory(self, self.name))
                 
        log.info("network recv task run")
        try :
            reactor.run()
        except:
            reactor.stop()
    
    def run(self):#讀取管道資料併發送  
        log.info("network send task run")
        while True:
            try:
                data = self.send_pipe.recv()#接受到的邏輯程序發來的網路資料則轉發出去
                self.send(data)
            except:
                traceback.print_exc()
    
    def send(self, data):#管道傳送到邏輯處理程序
        u"""
        . 傳送訊息
        """
        self.count += 1
        if self.count%10000==0:
            log.info("network send event:%d", self.count)
            
        (rc, event) = PipeEvent.createEventFromStream(data)
        if rc:
            channel = self.getChannel(event.dstId)
            if channel!=None:
                channel.send(event.eventData)
            else:
                log.error("service:%d channel not find", event.dstId)
        else:
            log.error("create pipe event error")

    def on_event(self, event):
        u"""
        . event: PipeEvent
        """
        try:
            self.recv_pipe.send(event.eventData)
        except:
            traceback.print_exc()

(2)啟動網路收發程序伺服器物件

讀取配置檔案啟動子服務物件。

def start_net_server(recv_queue, send_queue, file):
    u"""建立子程序服務物件"""
    log.info(u"System Starting -> loading configuration %s" , file)
    conf = SystemConfig(file)
    net_server = BaseNetServer(recv_queue, send_queue, conf)

2、邏輯處理伺服器物件 

邏輯處理伺服器物件接受網路收發程序傳送來的資料,反序列化後檢查事件的合法性和完整性,然後派送到具體的邏輯服務並進行處理。
class TaskServer:
    u"""
    .伺服器物件,各個服務都通過伺服器物件,可進行
    1)服務之間通訊
    2)讀取配置

    .伺服器物件主要的功能包括
    1) 初始化
    2)讀取配置
    3)裝載服務
    4)實現通訊機制

    .伺服器維護以下關鍵資料結構
    1)路由表:當收到事件後需要查詢路由表尋找事件的接收伺服器
    2)服務表:該表儲存服務資料,服務和網路的關係
    3)通訊頻道表:該表儲存當前的通道資料資訊
    """
    def __init__(self, conf, file):#file配置檔案,conf配置檔案的解析物件
        self.conf = conf
        self.route = {} # service_id: ServiceItem
        self.services = {} # service_id: service object
        self.lock = Lock()
        self.name = conf.myself.name
        
        self.recv_pipe, self.child_recv_pipe = Pipe()#接收管道的兩端
        self.send_pipe, self.child_send_pipe = Pipe()#傳送管道的兩端
        self.network = Process(target=start_net_server, args=(self.child_recv_pipe, \
                                                        self.child_send_pipe, file))#管道網路程序處理收發,程序是建立子程序伺服器物件
        self.network.start()#啟動網路程序
        self.init_server_data()
        # create route table
        for s in conf.servers:
            for svcId, svc in s.services.items():
                self.route[svcId] = ServiceItem(svcId, s.name)
        
        global SERVER
        SERVER = self
        
    def init_server_data(self):
        log.info("begin init data")
        ServerData.init_load()
        log.info("end init data")
                
    def getServerConfig(self, serverName):
        u"""獲得指定伺服器的配置資訊"""
        for server in self.conf.servers:
            if server.name == serverName:
                return server
        return None
    def getServiceConfig(self, serviceId):
        u"""獲得指定服務的配置資訊"""
        return self.conf.myself.services[serviceId]
                
    # -----  service ----------------------------------------------------------
    def registeService(self, service):
        u"""服務註冊函式,服務註冊成功後,則可以收發訊息"""
        self.services[service.serviceId] = service
    
    def unregisteService(self, service):
        u"""服務登出函式,服務註冊成功後,則可以收發訊息"""
        del self.services[service.serviceId]
    
    # ----- event ------------------------------------------------------------
    
    def onChannelEvent(self, event):
        u"""
        Channel收到相應的事件後將呼叫伺服器物件的onChannelEvent
       . 伺服器物件根據dstId,轉發給相應的本地服務
        """
        self._dispatchEvent(event)
        
    def _sendEvent(self, buf):#通過管道程序來發送
        self.send_pipe.send(buf)
               
    def sendToPipe(self, event):#管道傳送
        u"""event:"""
        self.send_pipe.send(event.toStream())#reactor.callFromThread(self._sendEvent, event.toStream())

    def sendEvent(self, eventData, srcId= -1, dstId= -1, eventType= -1, \
                            param1= -1, param2= -1, senceId=-1, origEvent=None):
        u"""
        .可通過呼叫本函式,傳送事件給指定服務
        1)伺服器物件首先檢查目標服務的位置
        2)對於遠端服務,通過Channel物件傳送事件至對端伺服器
        3)對於本地服務,則直接轉發
        eventData : 傳送的資料
        srcId : 源服務的標記,如origEvent不為空,則為origEvent.dstId
        dstId : 目標服務的標記,如origEvent不為空,則為origEvent.srcId
        eventType: 事件  型,如果不填寫則為-1
        origEvent: 為源事件
        """
        # check whether event should be sent over network or not
        if origEvent != None:
            srcId = origEvent.dstId
            dstId = origEvent.srcId
            param1 = origEvent.param1
            param2 = origEvent.param2
            senceId = origEvent.senceId
            
        item = self.route[dstId]

        if item == None:
            log.warning(u"Event(%s) Missing due to no this service(%d)", \
              eventData, dstId)
            return - 1

        event = Event.createEvent(srcId, dstId, eventType, param1, param2, eventData, senceId)     
        if item.serverName == self.name: # local event
            self._dispatchEvent(event)
        else:
            self.sendToPipe(event)#不是本伺服器程序的傳送到別的伺服器程序
        return event.tranId
    
    def notifyEvent(self, userId, data, event_type, sceneId=0, serviceId=0):
        u"""
        . 傳送通知事件給客戶端
        """
        self.sendEvent(data, serviceId, NOTIFY_SERVICE, \
                     NOTIFY_SERVICE_SENDTO_CLIENT.REQ, userId, event_type, sceneId)

    def _dispatchEvent(self, event):
        u"""分發事件至本地服務的函式"""
        svc = self.services[event.dstId]
        if svc != None:
            try :
                svc.dispatch(event)
            except Full, e:
                log.warning(u"Event(%s) Missing due to queue is full", event) 
        else:
            log.warning(u"Event(%s) Missing due to no this service", event)

    def _dispatchTimerEvent(self, event):
        u"""分發時間事件至本地服務的函式"""
        svc = self.services[event.dstId]
        if svc != None:
            svc.dispatchTimerEvent(event)
        else:
            log.warning(u"Event(%s) Missing due to no this service", event)
            
    def localBroadcastEvent(self, eventType, param1, param2, eventData):
        u"""實現本地服務廣播"""
        for id, svc in self.services.items():
            evt = Event.createEvent(-1, svc.serviceId, eventType, param1, \
              param2, eventData)
            self._dispatchEvent(evt)
    # ----- timer -------------------------------------------------------------


    def setTimerByFunc(self, delay, func, *args, **kw):
        u"""設定定時器,並通過另外的函式喚醒"""
        reactor.callLater(delay, func, args, kw)
    
    def setTimerByClientEvent(self, delay, sid, event):
        u"""
        .設定定時器,直接分發呼叫者傳入的事件,也是通過訊息佇列的方式返回給客戶端
        """
        def onTime(server, sid, evt):
            svc = self.services[sid]
            if svc != None:
                svc.dispatchTimerEvent(evt)
            else:
                log.warning(u"Event(%s) Missing due to no this service", event)
            
        return reactor.callLater(delay, onTime, self, sid, event)


    def setTimerByEvent(self, delay, sid, eventData):
        u"""設定定時器的函式"""
        event = Event.createEvent(0, sid, 1, -1, -1, eventData)
        def onTime(server, evt):
            self._dispatchTimerEvent(evt)

        return reactor.callLater(delay, onTime, self, event)
    
    # ----- run --------------------------------------------------------------
        
    def installServices(self):
        u"""通過配置檔案的資訊,啟動,初始化和註冊服務"""
        for svcId, svc in self.conf.myself.services.items():
            name = svc.options["module"]
            __import__(name)
            module = sys.modules[name]
            log.info(u"install service : %s", name)
            print "----- install service:", name
            
            self.registeService(eval(svc.options["code"])) # TODO: ?
    
    def run(self, file):
        u"""
        .主迴圈函式
        1)建立 網路執行緒
        2) 監聽接收管道
        """
        self.thread = Thread(target=self.recv_loop)#
        self.thread.setDaemon(True)
        self.thread.start()
        log.info("reactor run")
        try :
            reactor.run()
        except:
            reactor.stop()

    def recv_loop(self):
        log.info("recv loop")
        while True:
            try:
                data = self.recv_pipe.recv()#監聽管道的事件
                (rc,event) = Event.createEventFromStream(data)
                if rc:
                    self.onChannelEvent(event)#把訊息派送到服務的訊息佇列
                else:
                    log.error("create event error")
            except:
                traceback.print_exc()
            
    def stop(self):
        u"""停止各個服務"""
        for service in self.services.values():
            service.stop()
            
        #time.sleep(5)
        for service in self.services.values():
            self.unregisteService(service)
        log.debug("stop server")

3、會話

class Channel:
	u"""
	.Channel類用於和網路上其它伺服器進行通訊,對於每個連線的Server,本伺服器會維護一個Channel例項
	.用於接收和傳送相關事件訊息
	.Channel會註冊至伺服器上,從網路上收到訊息後會緩衝以及解析,生成事件物件並交伺服器處理
	.Channel會打包傳送的資料,並通過Twisted提供的傳輸機制來發送資料,Channel物件不進行資料快取,
	.Twisted會解決傳送過程中的快取問題
	"""
	def __init__(self, transport):
	u"""
	.通過建立的網路傳輸物件,初始化Channel例項
	"""
	self.transport = transport
	self.buffer = ""
	self.listener = None

	def send(self, buf):
		u"""
		. 傳送訊息方法
		. 該方法會使用反應器的callFromThread方法進行處理
		. 1) 解決執行緒安全的問題
		. 2) 解決網路傳送的問題
		. 注意:trasport.write方法不是執行緒安全的方法
		"""
		#self._sendEvent(buf)
		reactor.callFromThread(self._sendEvent, buf) # NOTE: reactor保證執行緒安全

	def _sendEvent(self, buf):
		self.transport.write(buf)

	def writeBuffer(self, data):#接收資料,檢查完整性,寫到管道
	    u"""
	    . 在網路上收到訊息後,該方法將被呼叫
	    . 該方法快取並解析事件,同時交監聽器處理
	    """
	    self.buffer += data
	    while True: 
		(result, event) = PipeEvent.createEventFromStream(self.buffer)#檢查事件合法性(這裡暫時不能優化,需要在管道保證訊息資料接收完整性)
		if result:
			self.buffer = self.buffer[event.length:]
		if (self.listener != None):
		    self.listener.on_event(event)
		else:
		    return

	def setListener(self, listener):
	    u""" listener: server.onChannelEvent """
	    self.listener = listener

4、管道事件

class PipeEvent:
	u"""
		. 程序管道傳輸事件
		"""
	def __init__(self, ver, length, sid, did, eventType, eventData):
		self.version = ver
		self.length = length
		self.srcId = sid
		self.dstId = did
		self.eventType = eventType
		self.eventData = eventData
			
	@staticmethod
	def createEventFromStream(data):
		u"""
		.return: (boolean, Event | None)
		"""
		l = len(data)
		if l < lengthOfHeader:
			return (False, None)
		(ver, length, sid, did, eventType) = struct.unpack_from(formatOfHeader, data)
		if l < length:
			return (False, None)
		return (True, PipeEvent(ver, length, sid, did, eventType, data[:length]))

相關推薦

遊戲伺服器連線伺服器python2

遊戲伺服器之長連線伺服器實現tcp連線的資料非同步收發。 一個網路收發處理程序,一個伺服器物件邏輯處理程序。兩個程序之間使用管道通訊。 網路收發處理程序: (1)網路處理是由反應器的子執行緒來處理的。 (2)從管道的一端讀取資料,讀取管道後和網路傳送前,需要前處理連包和反

如何實現android和伺服器保持連線

            這種功能實際上就是資料同步,同時要考慮手機本身、電量、網路流量等等限制因素,所以通常在移動端上有一下兩個解決方案:   1.一種是定時去server查詢資料,通常是使用HTTP協議來訪問web伺服器,稱Polli

Web 通訊 連線輪詢long polling

基於HTTP的長連線,是一種通過長輪詢方式實現"伺服器推"的技術,它彌補了HTTP簡單的請求應答模式的不足,極大地增強了程式的實時性和互動性。 一、什麼是長連線、長輪詢? 用通俗易懂的話來說,就是客戶端不停的向伺服器傳送請求以獲取最新的資料資訊。這裡的“不停

[轉]opencv3 圖像處理 圖像縮放 python與c++實現

space original 註意 libs 波紋 輸出 uil iostream 3.5 轉自:https://www.cnblogs.com/dyufei/p/8205121.html 一. 主要函數介紹 1) 圖像大小變換 cvResize () 原型: void

Pthon學習路 第四篇 Python基礎

pri bsp programs -s alt 如果 lex class 算數運算 1.運算符:+ - *(乘法) /(除法) %(求余) //(求商) **(求冪) 2.成員運算:in not in:判斷單個字符或者子序列在不在字符串中。(n

連線是如何實現的不看後悔,一看必懂

  在HTTP1.0和HTTP1.1協議中都有對長連線的支援。其中HTTP1.0需要在request中增加”Connection: keep-alive“ header才能夠支援,而HTTP1.1預設支援.  http1.0請求與服務端的互動過程: &nbs

Android架構連線技術

本文首發於小專欄《Android 架構之長連線技術》,更多Android架構文章歡迎關注《億級Android架構》 上一篇文章《Android 架構之網路框架(上)》中,我們談過了網路框架OkHttp、網路加速方案如HttpDNS、資料壓縮與序列化等技術點。本文我們結合騰訊Mars框架和美團Sha

linux伺服器A遠端連線伺服器B的mysql及1045錯誤

伺服器A上已經存在定時執行備份mysql的指令碼,最近伺服器B又新起了一個專案,也需要備份資料。圖方便,打算直接都在A的指令碼中執行。 指令碼如下:     dateStr=`date +"%y%m%d-%H%M%S"` if [ ! -z "${1}" ]; then mkdir

react-native開發專案連線夜神模擬器步驟window

這裡是window為準, 因為mac電腦 夜神模擬器暫時沒有搖一搖功能 連線夜神模擬器 adb.exe connect 127.0.0.1:62001 返回 connected to 127.0.0.1:62001 說明 連線成功!!!! cmd 開啟命令編輯器 進入專案目錄 win

VB實現自動取得伺服器IP並連線伺服器(利用UDP廣播)

普通的講解WINSOCK控制元件等網路控制元件的用法的資料中,無論是使用TCP還是UDP,客戶端程式連線服務端程式時,無一例外的要指定IP地址(或機器名)和埠號,那麼,在埠已知而服務端程式所在機器地址或名稱未知的情況下,怎樣實現客戶端程式自動取得服務端IP並建立連線呢,筆者

JAVA連線、短連線和心跳包

短連線: client向server發起連線,server接到請求,雙方建立連線,client向server傳送訊息,server迴應client,一次讀寫完成雙方都可以發起close請求 優點:短連線對於伺服器來說較為簡單,存在的連線都是有用的連線,不需要額外的控制。

tensorflow學習識別單張圖片的實現python手寫數字

假設我們已經安裝好了tensorflow。 一般在安裝好tensorflow後,都會跑它的demo,而最常見的demo就是手寫數字識別的demo,也就是mnist資料集。 然而我們僅僅是跑了它的demo而已,可能很多人會有和我一樣的想法,如果拿來一張數字圖片,如何應用我們訓

Python入門小練習2

python入門小練習 用戶密碼登錄三次鎖定 用戶密碼登錄三次鎖定案例需求1.輸入用戶名密碼 2.認證成功後顯示歡迎信息 3.輸錯三次後鎖定實現思路: 1.判斷用戶是否在黑名單,如果在黑名單提示賬號鎖定。 2.判斷用戶是否存在,如果不存在提示賬號不存在。 3.判斷賬號密碼是否正確,如果正確登

零基礎掌握百度地圖興趣點獲取POI爬蟲python語言爬取基礎篇

region map 基礎 輸入 filter put mark page -h 實現目的:爬取昆明市範圍內的全部中學數據,包括名稱、坐標。 先進入基礎篇,本篇主要講原理方面,並實現步驟分解,為python代碼編寫打基礎。 因為是0基礎開始,所以講得會比較詳細。 如實現目的

Python 標準庫一覽Python進階學習

日誌配置 -html 令行 鼓勵 python.h 垃圾 eap form types 轉自:http://blog.csdn.net/jurbo/article/details/52334345 寫這個的起因是,還是因為在做Python challenge的時候,

python入門Python和Pycharm安裝

不能 速查 ins ase 技術 ati scroll env 按鈕 Python簡介 Python是一種計算機程序設計語言,它結合了解釋性、編譯性、互動性和面向對象的腳本語言,非常簡單易用。Python 的設計具有很強的可讀性,相比其他語言經常使用英文關鍵字,其他語言

天天向上的力量 IIIpython在pycharm實現

其中 提高 範圍 pri python pos 學習 沒有 用戶輸入 ‘‘‘描述一年365天,以第1天的能力值為基數,記為1.0。當好好學習時,能力值相比前一天提高N‰;當沒有學習時,能力值相比前一天下降N‰。每天努力或放任,一年下來的能力值相差多少呢?其中,N的取值範圍是

數據結構-冒泡排序Python&java實現

冒泡排序1. 冒泡算法的核心思想冒泡排序的核心思想就是掃描數據清單,找到亂序的兩個相鄰的數據進行兩兩比較並交換位置,然後繼續掃描數據,接著反復重復上述的操作直至排序結束。2. 示例我們以23,15,58,-4,99,0這組無序的數字為例:例子為從小到大排序,初始狀態:23,15,58, -4,99

數據結構-插入排序Python&java實現

數據結構 java python 插入排序1.插入排序的工作原理插入排序的工作原理是通過構建有序序列,對於未排序數據,在已排序序列中從後向前掃描,找到相應位置並插入。2.示例我們以3,0,89,67,-2,9這組無序的數字為例:例子為從小到大排序,初始狀態 3 0 89 67 -2 9

Make your own neural networkPython神經網路程式設計

  這本書應該算我第一本深度學習的程式碼入門書了吧,之前看阿里云云棲社和景略集智都有推薦這本書就去看了,   成功建立了自己的第一個神經網路,也瞭解一些關於深度學習的內容,再加上這學期的概率論與數理統計的課,   現在再來看李大大的機器學習課程,終於能看懂LogisticsRegression概率那部分公