1. 程式人生 > 其它 >[Go語言]一種用於網遊伺服器的支援多路複用的網路協議處理框架

[Go語言]一種用於網遊伺服器的支援多路複用的網路協議處理框架

簡介:

本文描述了使用Go語言實現的、適應於Go語言併發模型的一種支援多路複用的網路協議處理框架,並提供了框架的程式碼實現。作者將這種框架用於網路遊戲伺服器中的協議處理,但也可用於其他領域。

應用背景:

在網路遊戲伺服器設計中,一般都會遇到協議多路複用的場景。比如登入伺服器和玩家客戶端之間有1:N的多個TCP連線;登入伺服器和遊戲伺服器之間是1:1的TCP連線。玩家登入遊戲的大致流程是這樣的:

  1. 玩家連線登入伺服器
  2. 登入伺服器向資料庫請求玩家資料
  3. 登入伺服器獲取到玩家資料,把玩家資料轉發給遊戲伺服器進行載入包括建立玩家物件等
  4. 登入伺服器獲取到載入成功迴應後,通知玩家客戶端可以進入遊戲世界

在3和4中,因為登入伺服器和遊戲伺服器通常只有一個TCP連線,所有玩家資料都是通過這個連線進行傳輸,所以需要從協議包中區分出是哪個玩家的資料。通常這個區分的依據可以是玩家的角色名,但是也可以更通用一些,用一個數字ID來區分,這樣就把協議包的分發處理和協議包中與遊戲邏輯有關的內容分離開來。

協議說明:

通常網遊的網路協議都是報文的形式,即使底層是使用TCP,也會用一些方法把資料拆分成一個個的報文(本文中稱為協議包)。因此,本文也基於這一假設,但是對於具體的協議包格式,本文沒有特別限制,只是要求協議包中能夠容納一個32位元組的ID。

協議包的處理大概可以分為以下兩種型別。其他更復雜的會話可以由以下兩種型別組合而成。

  1. 傳送一個數據包並等待迴應。比如登入伺服器等待遊戲伺服器載入玩家資料的結果通知。
  2. 傳送一個數據包,不需要回應。比如遊戲伺服器載入玩家資料後,給登入伺服器傳送結果通知。

框架說明:

Go語言是一種支援高併發的程式語言,它支援高併發的方式是大量輕量級的goroutine併發執行。在每個goroutine中的操作基本上都是同步阻塞的,這樣可以極大地簡化程式邏輯,使得程式碼清晰易讀,容易維護。基於這點,本文實現的框架的呼叫介面也是使用同步方式的。

  1. 如果一個協議包需要等待迴應,就在呼叫函式上阻塞等待。這個呼叫的簽名為: func (p *Connection) Query(data []byte) ([]byte, error) 注意:data的控制權會轉交給框架,因此函式呼叫後不能修改data的內容。
  2. 如果傳送一個協議包是對於接收到的某個協議包的迴應,則呼叫: func (p *Connection) Reply(query, answer []byte) error 注意:answer的控制權會轉交給框架,因此函式呼叫後不能修改answer的內容。
  3. 如果一個協議包不需要回應,就直接呼叫傳送函式: func (p *Connection) Write(data []byte) error 注意:data的控制權會轉交給框架,因此函式呼叫後不能修改data的內容。
  4. 呼叫者需要實現的介面:
  • Socket。用於協議包的收發。基本上是net.TCPConn的簡單封裝,在頭部加上一個協議包的長度。
  • DataHandler。用於協議處理,即沒有通過Query返回的協議包會分發給此介面處理。
  • ErrorHandler。用於錯誤處理。當斷線時,會呼叫此介面。
  • IdentityHandler。用於讀取和設定會話ID。

5. 關於goroutine安全的說明:

ErrorHandler和DataHandler的函式實現中不能直接呼叫(*Connection).Close,否則會導致死鎖。

匯出型別、函式和介面:
type Connection

func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, ehErrorHandler) *Connection
func (p *Connection) Start()
func (p *Connection) Close()
func (p *Connection) Query(data []byte) (res []byte, err error)
func (p *Connection) Reply(query, answer []byte) error
func (p *Connection) Write(data []byte) error
type Socket interface {

Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {

Process([]byte)
}
type ErrorHandler interface {

OnError(error)
}
type IdentityHandler interface {

GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}

完整的程式碼實現:
package multiplexer

import (

"errors"
"sync"
"sync/atomic"
)
var (

ERR_EXIT = errors.New("exit")
)
type Socket interface {

Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {

Process([]byte)
}
type ErrorHandler interface {

OnError(error)
}
type IdentityHandler interface {

GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
type Connection struct {


conn Socket

wg sync.WaitGroup

mutex sync.Mutex

applicants map[uint32]chan []byte

chexit chan bool

chsend chan []byte

chch chan chan []byte

dh DataHandler

ih IdentityHandler

eh ErrorHandler

identity uint32

}
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler)*Connection {


count := maxcount

if count < 1024 {



count = 1024


}

chch := make(chan chan []byte, count)

for i := 0; i < count; i++ {



chch <- make(chan []byte, 1)


}

return &Connection{



conn: conn,


applicants: make(map[uint32]chan []byte, count),


chsend: make(chan []byte, count),


chexit: make(chan bool),


chch: chch,


dh: dh,


ih: ih,


eh: eh,


}

}

func (p *Connection) Start() {


p.wg.Add(2)

go func() {



defer p.wg.Done()



p.recv()


}()

go func() {



defer p.wg.Done()



p.send()


}()

}

func (p *Connection) Close() {


close(p.chexit)

p.conn.Close()

p.wg.Wait()

}

func (p *Connection) Query(data []byte) (res []byte, err error) {


var ch chan []byte

select {

case <-p.chexit:



return nil, ERR_EXIT


case ch = <-p.chch:



defer func() {




p.chch <- ch



}()


}

id := p.newIdentity()

p.ih.SetIdentity(data, id)

p.addApplicant(id, ch)

defer func() {



if err != nil {




p.popApplicant(id)



}


}()

if err := p.Write(data); err != nil {



return nil, err


}

select {

case <-p.chexit:



return nil, ERR_EXIT


case res = <-ch:



break


}

return res, nil

}
func (p *Connection) Reply(query, answer []byte) error {


// put back the identity attached to the query

id := p.ih.GetIdentity(query)

p.ih.SetIdentity(answer, id)

return p.Write(answer)

}
func (p *Connection) Write(data []byte) error {


select {

case <-p.chexit:



return ERR_EXIT


case p.chsend <- data:



break


}

return nil

}
func (p *Connection) send() {


for {



select {


case <-p.chexit:



return


case data := <-p.chsend:



if p.conn.Write(data) != nil {





return




}



}


}

}
func (p *Connection) recv() (err error) {


defer func() {



if err != nil {




select {



case <-p.chexit:





err = nil




default:





p.eh.OnError(err)




}



}


}()

for {



select {


case <-p.chexit:




return nil



default:




break



}


data, err := p.conn.Read()


if err != nil {




return err



}


if id := p.ih.GetIdentity(data); id > 0 {




ch, ok := p.popApplicant(id)



if ok {





ch <- data





continue




}



}


p.dh.Process(data)


}

return nil

}
func (p *Connection) newIdentity() uint32 {


return atomic.AddUint32(&p.identity, 1)

}
func (p *Connection) addApplicant(identity uint32, ch chan []byte) {


p.mutex.Lock()

defer p.mutex.Unlock()

p.applicants[identity] = ch

}
func (p *Connection) popApplicant(identity uint32) (chan []byte, bool) {


p.mutex.Lock()

defer p.mutex.Unlock()

ch, ok := p.applicants[identity]

if !ok {



return nil, false


}

delete(p.applicants, identity)

return ch, true

}