1. 程式人生 > 其它 >golang遊戲伺服器框架_Go開源遊戲伺服器框架——Pitaya

golang遊戲伺服器框架_Go開源遊戲伺服器框架——Pitaya

技術標籤:golang遊戲伺服器框架

簡介

Pitaya是一款由國外遊戲公司topfreegames使用golang進行編寫,易於使用,快速且輕量級的開源分散式遊戲伺服器框架
Pitaya使用etcd作為預設的服務發現元件,提供使用nats和grpc進行遠端呼叫(server to server)的可選配置,並提供在docker中執行以上元件(etcd、nats)的docker-compose配置

抽象分析

  • PlayerConn
    PlayerConn是一個封裝的連線物件,繼承net.Conn,並提供一個獲取下一個資料包的方法

typePlayerConninterface{
GetNextMessage()(b[]byte,errerror)
net.Conn
}
  • Acceptor
    Acceptor代表一個服務端埠程序,接收客戶端連線,並用一個內部Chan來維護這些連線物件

typeAcceptorinterface{
ListenAndServe()
Stop()
GetAddr()string
GetConnChan()chanPlayerConn
}
  • Acceptorwrapper
    Acceptorwrapper義如其名就是Acceptor的包裝器,因為Acceptor的通過Chan來儲存連線
    所以wrapper可以通過遍歷這個Chan來實時包裝這些連線

typeWrapperinterface{
Wrap(acceptor.Acceptor)acceptor.Acceptor
}
  • Agent
    Agent是一個服務端的應用層連線物件,包含了:
    Session資訊
    伺服器預傳送訊息佇列
    拆解包物件
    最後心跳時間
    停止傳送心跳的chan
    關閉傳送資料的chan
    全域性的關閉訊號
    連線物件
    Agent當前狀態
    … ..

type(
//AgentcorrespondstoauserandisusedforstoringrawConninformation
Agentstruct{
Session*session.Session//session
appDieChanchanbool//appdiechannel
chDiechanstruct{}//waitforclose
chSendchanpendingWrite//pushmessagequeue
chStopHeartbeatchanstruct{}//stopheartbeats
chStopWritechanstruct{}//stopwritingmessages
closeMutexsync.Mutex
connnet.Conn//low-levelconnfd
decodercodec.PacketDecoder//binarydecoder
encodercodec.PacketEncoder//binaryencoder
heartbeatTimeouttime.Duration
lastAtint64//lastheartbeatunixtimestamp
messageEncodermessage.Encoder
......
stateint32//currentagentstate
}

pendingWritestruct{
ctxcontext.Context
data[]byte
errerror
}
)
  • Component
    Component代表業務元件,提供若干個介面
    通過Component生成處理請求的Service

typeComponentinterface{
Init()
AfterInit()
BeforeShutdown()
Shutdown()
}
  • Handler、Remote、Service
    Handler和Remote分別代表本地邏輯執行器和遠端邏輯執行器
    Service是一組服務物件,包含若干Handler和Remote
    這裡有個溫柔的細節——Receiver reflect.Value
    pitaya的設計者為了降低引用,採取在邏輯執行器中保留方法的Receiver以達到在Handler和Remote物件中,只需要儲存型別的Method,而無需儲存帶物件引用的Value.Method

type(
//Handlerrepresentsamessage.Message'shandler'smetainformation.
Handlerstruct{
Receiverreflect.Value//receiverofmethod
Methodreflect.Method//methodstub
Typereflect.Type//low-leveltypeofmethod
IsRawArgbool//whetherthedataneedtoserialize
MessageTypemessage.Type//handlerallowedmessagetype(eitherrequestornotify)
}

//Remoterepresentsremote'smetainformation.
Remotestruct{
Receiverreflect.Value//receiverofmethod
Methodreflect.Method//methodstub
HasArgsbool//ifremotehasnoargswewon'ttrytoserializereceiveddataintoarguments
Typereflect.Type//low-leveltypeofmethod
}

//Serviceimplementsaspecificservice,someofit'smethodswillbe
//calledwhenthecorrespondeventsisoccurred.
Servicestruct{
Namestring//nameofservice
Typereflect.Type//typeofthereceiver
Receiverreflect.Value//receiverofmethodsfortheservice
Handlersmap[string]*Handler//registeredmethods
Remotesmap[string]*Remote//registeredremotemethods
Optionsoptions//options
}
)
  • Modules
    Modules模組和Component結構一致,唯一的區別在於使用上
    Modules主要是面向系統的一些全域性存活的物件
    方便在統一的時機,集中進行啟動和關閉

typeBasestruct{}

func(c*Base)Init()error{
returnnil
}

func(c*Base)AfterInit(){}

func(c*Base)BeforeShutdown(){}

func(c*Base)Shutdown()error{
returnnil
}

集中管理的物件容器在外部module.go中定義

var(
modulesMap=make(map[string]interfaces.Module)
modulesArr=[]moduleWrapper{}
)

typemoduleWrapperstruct{
moduleinterfaces.Module
namestring
}
  • HandleService
    HandleService就是服務端的主邏輯物件,負責處理一切資料包
    chLocalProcess用於儲存待處理的客戶端資料包
    chRemoteProcess用於儲存待處理的來自其他伺服器的資料包
    services註冊了處理客戶端的服務
    內部聚合一個RemoteService物件,專門負責處理伺服器間的資料包

type(
HandlerServicestruct{
appDieChanchanbool//diechannelapp
chLocalProcesschanunhandledMessage//channelofmessagesthatwillbeprocessedlocally
chRemoteProcesschanunhandledMessage//channelofmessagesthatwillbeprocessedremotely
decodercodec.PacketDecoder//binarydecoder
encodercodec.PacketEncoder//binaryencoder
heartbeatTimeouttime.Duration
messagesBufferSizeint
remoteService*RemoteService
serializerserialize.Serializer//messageserializer
server*cluster.Server//serverobj
servicesmap[string]*component.Service//allregisteredservice
messageEncodermessage.Encoder
metricsReporters[]metrics.Reporter
}

unhandledMessagestruct{
ctxcontext.Context
agent*agent.Agent
route*route.Route
msg*message.Message
}
)
  • RemoteService
    RemoteService中維護服務發現和註冊提供的遠端服務

typeRemoteServicestruct{
rpcServercluster.RPCServer
serviceDiscoverycluster.ServiceDiscovery
serializerserialize.Serializer
encodercodec.PacketEncoder
rpcClientcluster.RPCClient
servicesmap[string]*component.Service//allregisteredservice
router*router.Router
messageEncodermessage.Encoder
server*cluster.Server//serverobj
remoteBindingListeners[]cluster.RemoteBindingListener
}
  • Timer
    Timer模組中維護一個全域性定時任務管理者,使用執行緒安全的map來儲存定時任務,通過time.Ticker的chan訊號來定期觸發

var(
Manager=&struct{
incrementIDint64
timerssync.Map
ChClosingTimerchanint64
ChCreatedTimerchan*Timer
}{}

Precision=time.Second

GlobalTicker*time.Ticker
)
  • pipeline
    pipeline模組提供全域性鉤子函式的配置
    BeforeHandler 在業務邏輯之前執行
    AfterHandler 在業務邏輯之後執行

var(
BeforeHandler=&pipelineChannel{}
AfterHandler=&pipelineAfterChannel{}
)

type(
HandlerTemplfunc(ctxcontext.Context,ininterface{})(outinterface{},errerror)AfterHandlerTemplfunc(ctxcontext.Context,outinterface{},errerror)(interface{},error)pipelineChannelstruct{
Handlers[]HandlerTempl
}

pipelineAfterChannelstruct{
Handlers[]AfterHandlerTempl
}
)

框架流程

app.go是系統啟動的入口
建立HandlerService
並根據啟動模式如果是叢集模式建立RemoteService
開啟服務端事件監聽
開啟監聽伺服器關閉訊號的Chan

var(
app=&App{
.....
}

remoteService*service.RemoteService
handlerService*service.HandlerService
)
funcStart(){
.....
ifapp.serverMode==Cluster{
.....
app.router.SetServiceDiscovery(app.serviceDiscovery)

remoteService=service.NewRemoteService(
app.rpcClient,
app.rpcServer,
app.serviceDiscovery,
app.router,
.....
)

app.rpcServer.SetPitayaServer(remoteService)

initSysRemotes()
}

handlerService=service.NewHandlerService(
app.dieChan,
app.heartbeat,
app.server,
remoteService,
.....
)

.....

listen()
.....
//stopserver
select{
caselogger.Log.Warn("theappwillshutdowninafewseconds")
cases:=logger.Log.Warn("gotsignal:",s,",shuttingdown...")
close(app.dieChan)
}
.....
}

listen方法也就是開啟服務,具體包括以下步驟:
1.註冊Component
2.註冊定時任務的GlobalTicker
3.開啟Dispatch處理業務和定時任務(ticket)的goroutine
4.開啟acceptor處理連線的goroutine
5.開啟主邏輯的goroutine
6.註冊Modules

funclisten(){
startupComponents()

timer.GlobalTicker=time.NewTicker(timer.Precision)

logger.Log.Infof("startingserver%s:%s",app.server.Type,app.server.ID)
fori:=0;i"pitaya.concurrency.handler.dispatch");i++{
gohandlerService.Dispatch(i)
}
for_,acc:=rangeapp.acceptors{
a:=acc
gofunc(){
forconn:=rangea.GetConnChan(){
gohandlerService.Handle(conn)
}
}()

gofunc(){
a.ListenAndServe()
}()

logger.Log.Infof("listeningwithacceptor%sonaddr%s",reflect.TypeOf(a),a.GetAddr())
}
.....
startModules()

logger.Log.Info("allmodulesstarted!")

app.running=true
}

startupComponents對Component進行初始化
然後把Component註冊到handlerService和remoteService上

funcstartupComponents(){
//componentinitializehooks
for_,c:=rangehandlerComp{
c.comp.Init()
}

//componentafterinitializehooks
for_,c:=rangehandlerComp{
c.comp.AfterInit()
}

//registerallcomponents
for_,c:=rangehandlerComp{
iferr:=handlerService.Register(c.comp,c.opts);err!=nil{
logger.Log.Errorf("Failedtoregisterhandler:%s",err.Error())
}
}

//registerallremotecomponents
for_,c:=rangeremoteComp{
ifremoteService==nil{
logger.Log.Warn("registeredaremotecomponentbutremoteServiceisnotrunning!skipping...")
}else{
iferr:=remoteService.Register(c.comp,c.opts);err!=nil{
logger.Log.Errorf("Failedtoregisterremote:%s",err.Error())
}
}
}
.....
}

比如HandlerService的註冊,反射得到component型別的全部方法,判斷isHandlerMethod就加入services裡面
並聚合Component物件的反射Value物件為全部Handler的Method Receiver,減少了物件引用

funcNewService(compComponent,opts[]Option)*Service{
s:=&Service{
Type:reflect.TypeOf(comp),
Receiver:reflect.ValueOf(comp),
}
.....
returns
}

func(h*HandlerService)Register(compcomponent.Component,opts[]component.Option)error{
s:=component.NewService(comp,opts)
.....
iferr:=s.ExtractHandler();err!=nil{
returnerr
}

h.services[s.Name]=s
forname,handler:=ranges.Handlers{
handlers[fmt.Sprintf("%s.%s",s.Name,name)]=handler
}
returnnil
}
func(s*Service)ExtractHandler()error{
typeName:=reflect.Indirect(s.Receiver).Type().Name()
.....
s.Handlers=suitableHandlerMethods(s.Type,s.Options.nameFunc)
.....
fori:=ranges.Handlers{
s.Handlers[i].Receiver=s.Receiver
}
returnnil
}
funcsuitableHandlerMethods(typreflect.Type,nameFuncfunc(string)string)map[string]*Handler{
methods:=make(map[string]*Handler)
form:=0;mmethod:=typ.Method(m)
mt:=method.Type
mn:=method.Name
ifisHandlerMethod(method){
.....
handler:=&Handler{
Method:method,
IsRawArg:raw,
MessageType:msgType,
}
.....
methods[mn]=handler
}
}
returnmethods
}

handlerService.Dispatch方法負責各種業務的處理,包括:
1.處理chLocalProcess中的本地Message
2.使用remoteService處理chRemoteProcess中的遠端Message
3.在定時ticket到達時呼叫timer.Cron執行定時任務
4.管理定時任務的建立
5.管理定時任務的刪除

func(h*HandlerService)Dispatch(threadint){
defertimer.GlobalTicker.Stop()

for{
select{
caselm:=metrics.ReportMessageProcessDelayFromCtx(lm.ctx,h.metricsReporters,"local")
h.localProcess(lm.ctx,lm.agent,lm.route,lm.msg)

caserm:=metrics.ReportMessageProcessDelayFromCtx(rm.ctx,h.metricsReporters,"remote")
h.remoteService.remoteProcess(rm.ctx,nil,rm.agent,rm.route,rm.msg)

case//executecrontask
timer.Cron()

caset:=//newTimers
timer.AddTimer(t)

caseid:=//closingTimers
timer.RemoveTimer(id)
}
}
}

接下來看看Acceptor的工作,以下為Tcp實現,就是負責接收連線,流入acceptor的Chan

func(a*TCPAcceptor)ListenAndServe(){
ifa.hasTLSCertificates(){
a.ListenAndServeTLS(a.certFile,a.keyFile)
return
}

listener,err:=net.Listen("tcp",a.addr)
iferr!=nil{
logger.Log.Fatalf("Failedtolisten:%s",err.Error())
}
a.listener=listener
a.running=true
a.serve()
}
func(a*TCPAcceptor)serve(){
defera.Stop()
fora.running{
conn,err:=a.listener.Accept()
iferr!=nil{
logger.Log.Errorf("FailedtoacceptTCPconnection:%s",err.Error())
continue
}

a.connChanConn:conn,
}
}
}

前面講過對於每個Acceptor開啟了一個goroutine去處理連線,也就是下面程式碼

forconn:=rangea.GetConnChan(){
gohandlerService.Handle(conn)
}

所以流入Chan的連線就會被實時的開啟一個goroutine去處理,處理過程就是先建立一個Agent物件
並開啟一個goroutine給Agent負責維護連線的心跳
然後開啟死迴圈,讀取連線的資料processPacket

func(h*HandlerService)Handle(connacceptor.PlayerConn){
//createaclientagentandstartupwritegoroutine
a:=agent.NewAgent(conn,h.decoder,h.encoder,h.serializer,h.heartbeatTimeout,h.messagesBufferSize,h.appDieChan,h.messageEncoder,h.metricsReporters)

//startupagentgoroutine
goa.Handle()
.....
for{
msg,err:=conn.GetNextMessage()

iferr!=nil{
logger.Log.Errorf("Errorreadingnextavailablemessage:%s",err.Error())
return
}

packets,err:=h.decoder.Decode(msg)
iferr!=nil{
logger.Log.Errorf("Failedtodecodemessage:%s",err.Error())
return
}

iflen(packets)1{
logger.Log.Warnf("Readnopackets,data:%v",msg)
continue
}

//processallpacket
fori:=rangepackets{
iferr:=h.processPacket(a,packets[i]);err!=nil{
logger.Log.Errorf("Failedtoprocesspacket:%s",err.Error())
return
}
}
}
}

這時如果使用了pitaya提供的漏桶演算法實現的限流wrap來包裝acceptor,則會對客戶端傳送的訊息進行限流限速
這裡也是靈活利用for迴圈遍歷chan的特性,所以也是實時地對連線進行包裝

func(b*BaseWrapper)ListenAndServe(){
gob.pipe()
b.Acceptor.ListenAndServe()
}

//GetConnChanreturnsthewrapperconnchan
func(b*BaseWrapper)GetConnChan()chanacceptor.PlayerConn{
returnb.connChan
}

func(b*BaseWrapper)pipe(){
forconn:=rangeb.Acceptor.GetConnChan(){
b.connChan}
}
typeRateLimitingWrapperstruct{
BaseWrapper
}

funcNewRateLimitingWrapper(c*config.Config)*RateLimitingWrapper{
r:=&RateLimitingWrapper{}
r.BaseWrapper=NewBaseWrapper(func(connacceptor.PlayerConn)acceptor.PlayerConn{
.....
returnNewRateLimiter(conn,limit,interval,forceDisable)
})
returnr
}

func(r*RateLimitingWrapper)Wrap(aacceptor.Acceptor)acceptor.Acceptor{
r.Acceptor=a
returnr
}

func(r*RateLimiter)GetNextMessage()(msg[]byte,errerror){
ifr.forceDisable{
returnr.PlayerConn.GetNextMessage()
}

for{
msg,err:=r.PlayerConn.GetNextMessage()
iferr!=nil{
returnnil,err
}

now:=time.Now()
ifr.shouldRateLimit(now){
logger.Log.Errorf("Data=%s,Error=%s",msg,constants.ErrRateLimitExceeded)
metrics.ReportExceededRateLimiting(pitaya.GetMetricsReporters())
continue
}

returnmsg,err
}
}

processPacket對資料包解包後,執行processMessage

func(h*HandlerService)processPacket(a*agent.Agent,p*packet.Packet)error{
switchp.Type{
casepacket.Handshake:
.....
casepacket.HandshakeAck:
.....
casepacket.Data:
ifa.GetStatus()returnfmt.Errorf("receivedataonsocketwhichisnotyetACK,sessionwillbeclosedimmediately,remote=%s",
a.RemoteAddr().String())
}
msg,err:=message.Decode(p.Data)
iferr!=nil{
returnerr
}
h.processMessage(a,msg)
casepacket.Heartbeat:
//expected
}
a.SetLastAt()
returnnil
}

processMessage中包裝資料包為unHandledMessage
根據訊息型別,流入chLocalProcess 或者chRemoteProcess 也就轉交給上面提到的負責Dispatch的goroutine去處理了

func(h*HandlerService)processMessage(a*agent.Agent,msg*message.Message){
requestID:=uuid.New()
ctx:=pcontext.AddToPropagateCtx(context.Background(),constants.StartTimeKey,time.Now().UnixNano())
ctx=pcontext.AddToPropagateCtx(ctx,constants.RouteKey,msg.Route)
ctx=pcontext.AddToPropagateCtx(ctx,constants.RequestIDKey,requestID.String())
tags:=opentracing.Tags{
"local.id":h.server.ID,
"span.kind":"server",
"msg.type":strings.ToLower(msg.Type.String()),
"user.id":a.Session.UID(),
"request.id":requestID.String(),
}
ctx=tracing.StartSpan(ctx,msg.Route,tags)
ctx=context.WithValue(ctx,constants.SessionCtxKey,a.Session)

r,err:=route.Decode(msg.Route)
.....
message:=unhandledMessage{
ctx:ctx,
agent:a,
route:r,
msg:msg,
}
ifr.SvType==h.server.Type{
h.chLocalProcess}else{
ifh.remoteService!=nil{
h.chRemoteProcess}else{
logger.Log.Warnf("requestmadetoanotherservertypebutnoremoteServicerunning")
}
}
}

伺服器程序啟動的最後一步是對全域性模組啟動

在外部的module.go檔案中,提供了對module的全域性註冊方法、全部順序啟動方法、全部順序關閉方法

funcRegisterModule(moduleinterfaces.Module,namestring)error{
.....
}

funcstartModules(){
for_,modWrapper:=rangemodulesArr{
modWrapper.module.Init()
}
for_,modWrapper:=rangemodulesArr{
modWrapper.module.AfterInit()
}
}

funcshutdownModules(){
fori:=len(modulesArr)-1;i>=0;i--{
modulesArr[i].module.BeforeShutdown()
}

fori:=len(modulesArr)-1;i>=0;i--{
mod:=modulesArr[i].module
mod.Shutdown()
}
}

處理細節

  • localProcess
    接下來看看localprocess對於訊息的處理細節(為了直觀省略部分異常處理程式碼)
    使用processHandlerMessagef方法對包裝出來的ctx物件進行業務操作
    最終根據訊息的型別 notify / Request 區分是否需要響應,執行不同處理

func(h*HandlerService)localProcess(ctxcontext.Context,a*agent.Agent,route*route.Route,msg*message.Message){
varmiduint
switchmsg.Type{
casemessage.Request:
mid=msg.ID
casemessage.Notify:
mid=0
}

ret,err:=processHandlerMessage(ctx,route,h.serializer,a.Session,msg.Data,msg.Type,false)
ifmsg.Type!=message.Notify{
.....
err:=a.Session.ResponseMID(ctx,mid,ret)
.....
}else{
metrics.ReportTimingFromCtx(ctx,h.metricsReporters,handlerType,nil)
tracing.FinishSpan(ctx,err)
}
}
  • processHandlerMessage
    這裡面負進行業務邏輯
    會先呼叫executeBeforePipeline(ctx, arg),執行前置的鉤子函式
    再通過util.Pcall(h.Method, args)反射呼叫handler方法
    再呼叫executeAfterPipeline(ctx, resp, err),執行後置的鉤子函式
    最後呼叫serializeReturn(serializer, resp),對請求結果進行序列化

funcprocessHandlerMessage(
ctxcontext.Context,
rt*route.Route,
serializerserialize.Serializer,
session*session.Session,
data[]byte,
msgTypeIfaceinterface{},
remotebool,
)([]byte,error){
ifctx==nil{
ctx=context.Background()
}
ctx=context.WithValue(ctx,constants.SessionCtxKey,session)
ctx=util.CtxWithDefaultLogger(ctx,rt.String(),session.UID())

h,err:=getHandler(rt)
.....

msgType,err:=getMsgType(msgTypeIface)
.....

logger:=ctx.Value(constants.LoggerCtxKey).(logger.Logger)
exit,err:=h.ValidateMessageType(msgType)
.....

arg,err:=unmarshalHandlerArg(h,serializer,data)
.....

ifarg,err=executeBeforePipeline(ctx,arg);err!=nil{
returnnil,err
}
.....

args:=[]reflect.Value{h.Receiver,reflect.ValueOf(ctx)}
ifarg!=nil{
args=append(args,reflect.ValueOf(arg))
}

resp,err:=util.Pcall(h.Method,args)
ifremote&&msgType==message.Notify{
resp=[]byte("ack")
}

resp,err=executeAfterPipeline(ctx,resp,err)
.....

ret,err:=serializeReturn(serializer,resp)
.....

returnret,nil
}
  • executeBeforePipeline
    實際就是執行pipeline的BeforeHandler

funcexecuteBeforePipeline(ctxcontext.Context,datainterface{})(interface{},error){
varerrerror
res:=data
iflen(pipeline.BeforeHandler.Handlers)>0{
for_,h:=rangepipeline.BeforeHandler.Handlers{
res,err=h(ctx,res)
iferr!=nil{
logger.Log.Debugf("pitaya/handler:brokenpipeline:%s",err.Error())
returnres,err
}
}
}
returnres,nil
}
  • executeAfterPipeline
    實際就是執行pipeline的AfterHandler

funcexecuteAfterPipeline(ctxcontext.Context,resinterface{},errerror)(interface{},error){
ret:=res
iflen(pipeline.AfterHandler.Handlers)>0{
for_,h:=rangepipeline.AfterHandler.Handlers{
ret,err=h(ctx,ret,err)
}
}
returnret,err
}

util.pcall裡展示了golang反射的一種高階用法
method.Func.Call,第一個引數是Receiver,也就是呼叫物件方法的例項
這種設計對比直接儲存Value物件的method,反射時直接call,擁有的額外好處就是降低了物件引用,方法不和例項繫結

funcPcall(methodreflect.Method,args[]reflect.Value)(retsinterface{},errerror){
.....
r:=method.Func.Call(args)
iflen(r)==2{
ifv:=r[1].Interface();v!=nil{
err=v.(error)
}elseif!r[0].IsNil(){
rets=r[0].Interface()
}else{
err=constants.ErrReplyShouldBeNotNull
}
}
return
}
f744f094bac33a4b1df132e45b4a806c.png
更多資料,請搜尋公眾號歪歪梯Club