thrift golang 解析
RPC
什麼是
RPC
?
RPC
全稱為Remote Procedure Call
翻譯過來就是遠端過程呼叫
RPC
和HTTP
的區別
HTTP
是一種協議,RPC
可以通過HTTP
來實現,也可以通過Socket
自己實現一套協議來實現。
論複雜度,RPC
框架肯定是高於簡單的HTTP
介面的。但毋庸置疑,HTTP
介面由於受限於HTTP
協議,需要帶HTTP
請求頭,導致傳輸起來效率或者說安全性不如RPC
並且要否認一點,HTTP
協議相對於TCP
報文協議,增加的開銷在於連線與斷開。HTTP
是支援連線池複用的(HTTP 1.x)
Thrift架構
Apache Thrift
是一個跨語言的服務框架,本質上為RPC
Thrift
包含一個完整的堆疊結構用於構建客戶端和伺服器端。
傳輸協議(TProtocol
)
Thrift
可以讓使用者選擇客戶端和服務端之間傳輸通訊協議的區別,在傳輸協議上總體分為文字和二進位制(binary
)傳輸協議,為了節省頻寬,提高傳輸效率,一般情況下使用二進位制型別的傳輸協議為多數。
-
TBinaryProtocol
:二進位制編碼格式進行資料傳輸 -
TCompactProtocol
:高效率的、密集的二進位制編碼格式進行資料傳輸 -
TJSONProtocol
:使用JSON
的資料編碼協議進行資料傳輸 -
TDebugProtocol
:使用易懂的可讀文字格式,以便於debug
資料傳輸方式(TTransport
)
TTransport
是與底層資料傳輸緊密相關的傳輸層。每一種支援的底層傳輸方式都存在一個與之對應的TTransport
。在這一層,資料是按位元組流處理的,即傳輸層看到的是一個又一個的位元組,並把這些位元組按順序傳送和接收。TTransport
並不瞭解它所傳輸的資料是什麼型別,實際上傳輸層也不關心資料是什麼型別,只需要按照位元組方式對資料進行傳送和接收即可。資料型別的解析在TProtocol
這一層完成。
-
TSocket
:使用阻塞式I/O
進行傳輸,是最常見的模式 -
THttpTransport
:採用HTTP
協議進行資料傳輸 -
TFramedTransPort
frame
為單位進行傳輸,非阻塞式服務中使用; -
TFileTransPort
:以檔案形式進行傳輸 -
TMemoryTransport
:將記憶體用於I/O
傳輸 -
TZlibTransport
:使用zlib
進行壓縮, 與其他傳輸方式聯合使用 -
TBufferedTransport
對某個transport
物件操作的資料進行buffer
,即從buffer
中讀取資料進行傳輸,或將資料直接寫入到buffer
服務端網路模型(TServer
)
TServer
在thrift
框架中的主要任務是接收client
請求,並轉發到某個processor
上進行請求處理。針對不同的訪問規模,thrift
提供了不同TServer
模型。thrift
目前支援的server
模型包括:
-
TSimpleServer
: 單執行緒伺服器端使用標準的阻塞式I/O
-
TTHreaadPoolServer
: 多執行緒伺服器端使用標準的阻塞式I/O
-
TNonblockingServer
:多執行緒伺服器端使用非阻塞式I/O
-
TThreadedServer
:多執行緒網路模型,使用阻塞式I/O
,為每個請求建立一個執行緒
對於`golang`來說,只有`TSimpleServer`服務模式,並且是非阻塞的
TProcesser
TProcessor
主要對TServer
中一次請求的inputProtocol
和outputProtocol
進行操作,也就是從inputProtocol
中讀出client
的請求資料,向outputProtocol
寫入使用者邏輯的返回值。TProcessorprocess
是一個非常關鍵的處理函式,因為client
所有的rpc
呼叫都會經過該函式處理並轉發
ThriftClient
ThriftClient
跟TProcessor
一樣主要操作inputProtocol
和outputProtocol
,不同的是thriftClient
將rpc
呼叫分為send
和receive
兩個步驟:
-
send
步驟,將使用者的呼叫引數作為一個整體的struct
寫入TProtocol
,併傳送到TServer
。 -
send
結束後,thriftClient
便立即進入receive
狀態等待TServer
的響應。對於TServer
的響應,使用返回值解析類進行返回值解析,完成rpc
呼叫。
TSimpleServer服務模式
實際上這不是典型的TSimpleServer
,因為它在接受套接字後沒有被阻塞。
它更像是一個TThreadedServer
,可以處理不同goroutine
中的不同連線。
如果golang
使用者在客戶端實現conn-pool
之類的東西這將有效。
type TSimpleServer struct {
quit chan struct{} // 採用阻塞channel進行判斷
processorFactory TProcessorFactory
serverTransport TServerTransport
inputTransportFactory TTransportFactory
outputTransportFactory TTransportFactory
inputProtocolFactory TProtocolFactory
outputProtocolFactory TProtocolFactory
}
複製程式碼
以下程式碼thrift-idl
為,接下來的解析以此為例
namespace go echo
struct EchoReq {
1: string msg;
}
struct EchoRes {
1: string msg;
}
service Echo {
EchoRes echo(1: EchoReq req);
}
複製程式碼
服務端Server程式碼
func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}
func (p *TSimpleServer) AcceptLoop() error {
for {
// 此處的Accept()是阻塞的,是呼叫listener.Accept()
client,err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:",err)
}
}()
}
}
}
複製程式碼
如果server
此時還在處理請求,服務端突然重啟,thrift 1.0
是無法做到優雅重啟的,但是go thrift
的最新版本採用了golang waitgroup
的方式實現了優雅重啟~
func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s",e,debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok,err := processor.Process(inputProtocol,outputProtocol)
if err,ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s",err)
return err
}
if err,ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}
複製程式碼
Process
處理邏輯
func (p *EchoProcessor) Process(iprot,oprot thrift.TProtocol) (success bool,err thrift.TException) {
name,_,seqId,err := iprot.ReadMessageBegin()
if err != nil {
return false,err
}
// 獲取傳遞過來的name,如果存在則處理
if processor,ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId,iprot,oprot)
}
// 異常邏輯
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD,"Unknown function "+name)
oprot.WriteMessageBegin(name,thrift.EXCEPTION,seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false,x3
}
複製程式碼
TServer
接收到rpc
請求之後,呼叫TProcessorprocess
進行處理。
TProcessorprocess
首先呼叫TTransport.readMessageBegin
介面,讀出rpc
呼叫的名稱和rpc
呼叫型別。
如果rpc
呼叫型別是rpc call
,則呼叫TProcessor.process_fn
繼續處理,對於未知的rpc
呼叫型別,則丟擲異常。
TProcessor.process_fn
根據rpc
呼叫名稱,到自己的processMap
中查詢對應的rpc
處理函式。如果存在對應的rpc
處理函式,則呼叫該處理函式繼續進行請求響應。不存在則丟擲異常。
func (p *echoProcessorEcho) Process(seqId int32,err thrift.TException) {
args := EchoEchoArgs{}
// 讀取入參的引數
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR,err.Error())
oprot.WriteMessageBegin("echo",seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false,err
}
iprot.ReadMessageEnd()
result := EchoEchoResult{}
var retval *EchoRes
var err2 error
// 此處是thrift為什麼err不能傳錯誤,如果傳業務錯誤會被阻塞
if retval,err2 = p.handler.Echo(args.Req); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR,"Internal error processing echo: "+err2.Error())
oprot.WriteMessageBegin("echo",seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true,err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("echo",thrift.REPLY,seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true,err
}
複製程式碼
服務端stop
程式碼
var once sync.Once
func (p *TSimpleServer) Stop() error {
q := func() {
p.quit <- struct{}{}
p.serverTransport.Interrupt()
}
once.Do(q)
return nil
}
複製程式碼
stop
函式比較簡單,可以看出直接向阻塞佇列裡面寫入資料,然後server
不再接受請求
客戶端程式碼
Client
呼叫的函式
func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes,err error) {
if err = p.sendEcho(req); err != nil {
return
}
return p.recvEcho()
}
複製程式碼
sendEcho()
函式
func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
// seqid + 1
p.SeqId++
if err = oprot.WriteMessageBegin("echo",thrift.CALL,p.SeqId); err != nil {
return
}
// 構建引數
args := EchoEchoArgs{
Req: req,}
if err = args.Write(oprot); err != nil {
return
}
// 通知伺服器傳送完畢
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
複製程式碼
recvEcho()
函式
func (p *EchoClient) recvEcho() (value *EchoRes,err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
//
method,mTypeId,err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "echo" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME,"echo failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID,"echo failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION,"Unknown Exception")
var error1 error
error1,err = error0.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error1
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION,"echo failed: invalid message type")
return
}
result := EchoEchoResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}
複製程式碼
thrift在mac機器安裝問題
- 問題1:
go get git.apache.org/thrift.git/lib/go/thrift
失敗 - 問題2:直接使用
github.com
提供的版本會報未知錯誤
問題2需要根據你的thrift -version
來判斷下載哪一個版本的thrift
,比如我的thrift版本是0.10.0
那麼需要下載的thrift
地址為https://github.com/apache/thrift/archive/0.10.0.zip
手動建立mkdir -p git.apache.org/thrift.git/lib/go/
目錄,然後將下載後的go
檔案移至該目錄下~