1. 程式人生 > 程式設計 >thrift golang 解析

thrift golang 解析

RPC

什麼是RPC?

RPC全稱為Remote Procedure Call 翻譯過來就是遠端過程呼叫

RPCHTTP的區別

HTTP是一種協議,RPC可以通過HTTP來實現,也可以通過Socket自己實現一套協議來實現。

論複雜度,RPC框架肯定是高於簡單的HTTP介面的。但毋庸置疑,HTTP介面由於受限於HTTP協議,需要帶HTTP請求頭,導致傳輸起來效率或者說安全性不如RPC

並且要否認一點,HTTP協議相對於TCP報文協議,增加的開銷在於連線與斷開。HTTP是支援連線池複用的(HTTP 1.x)

Thrift架構

Apache Thrift是一個跨語言的服務框架,本質上為RPC

,同時具有序列化、反序列化機制Thrift包含一個完整的堆疊結構用於構建客戶端和伺服器端。

傳輸協議(TProtocol)

Thrift可以讓使用者選擇客戶端和服務端之間傳輸通訊協議的區別,在傳輸協議上總體分為文字和二進位制(binary)傳輸協議,為了節省頻寬,提高傳輸效率,一般情況下使用二進位制型別的傳輸協議為多數。

  • TBinaryProtocol:二進位制編碼格式進行資料傳輸
  • TCompactProtocol:高效率的、密集的二進位制編碼格式進行資料傳輸
  • TJSONProtocol:使用 JSON 的資料編碼協議進行資料傳輸
  • TDebugProtocol:使用易懂的可讀文字格式,以便於debug

資料傳輸方式(TTransport)

TTransport是與底層資料傳輸緊密相關的傳輸層。每一種支援的底層傳輸方式都存在一個與之對應的TTransport。在這一層,資料是按位元組流處理的,即傳輸層看到的是一個又一個的位元組,並把這些位元組按順序傳送和接收。TTransport並不瞭解它所傳輸的資料是什麼型別,實際上傳輸層也不關心資料是什麼型別,只需要按照位元組方式對資料進行傳送和接收即可。資料型別的解析在TProtocol這一層完成。

  • TSocket:使用阻塞式 I/O進行傳輸,是最常見的模式
  • THttpTransport:採用HTTP協議進行資料傳輸
  • TFramedTransPort
    : 以frame為單位進行傳輸,非阻塞式服務中使用;
  • TFileTransPort:以檔案形式進行傳輸
  • TMemoryTransport:將記憶體用於I/O傳輸
  • TZlibTransport:使用zlib進行壓縮, 與其他傳輸方式聯合使用
  • TBufferedTransport對某個transport物件操作的資料進行buffer,即從buffer中讀取資料進行傳輸,或將資料直接寫入到buffer

服務端網路模型(TServer)

TServerthrift框架中的主要任務是接收client請求,並轉發到某個processor上進行請求處理。針對不同的訪問規模,thrift提供了不同TServer模型。thrift目前支援的server模型包括:

  • TSimpleServer: 單執行緒伺服器端使用標準的阻塞式I/O
  • TTHreaadPoolServer: 多執行緒伺服器端使用標準的阻塞式I/O
  • TNonblockingServer:多執行緒伺服器端使用非阻塞式I/O
  • TThreadedServer:多執行緒網路模型,使用阻塞式I/O,為每個請求建立一個執行緒

對於`golang`來說,只有`TSimpleServer`服務模式,並且是非阻塞的

TProcesser

TProcessor主要對TServer中一次請求的inputProtocoloutputProtocol進行操作,也就是從inputProtocol中讀出client的請求資料,向outputProtocol寫入使用者邏輯的返回值。TProcessorprocess是一個非常關鍵的處理函式,因為client所有的rpc呼叫都會經過該函式處理並轉發

ThriftClient

ThriftClientTProcessor一樣主要操作inputProtocoloutputProtocol,不同的是thriftClientrpc呼叫分為sendreceive兩個步驟:

  • send步驟,將使用者的呼叫引數作為一個整體的struct寫入TProtocol,併傳送到TServer
  • send結束後,thriftClient便立即進入receive狀態等待TServer的響應。對於TServer的響應,使用返回值解析類進行返回值解析,完成rpc呼叫。

TSimpleServer服務模式

實際上這不是典型的TSimpleServer因為它在接受套接字後沒有被阻塞。 它更像是一個TThreadedServer,可以處理不同goroutine中的不同連線。 如果golang使用者在客戶端實現conn-pool之類的東西這將有效。

type TSimpleServer struct {
	quit chan struct{}     // 採用阻塞channel進行判斷

	processorFactory       TProcessorFactory
	serverTransport        TServerTransport
	inputTransportFactory  TTransportFactory
	outputTransportFactory TTransportFactory
	inputProtocolFactory   TProtocolFactory
	outputProtocolFactory  TProtocolFactory
}

複製程式碼

以下程式碼thrift-idl為,接下來的解析以此為例

namespace go echo

struct EchoReq {
    1: string msg;
}

struct EchoRes {
    1: string msg;
}

service Echo {
    EchoRes echo(1: EchoReq req);
}
複製程式碼

服務端Server程式碼

func (p *TSimpleServer) Serve() error {
	err := p.Listen()
	if err != nil {
		return err
	}
	p.AcceptLoop()
	return nil
}

func (p *TSimpleServer) AcceptLoop() error {
	for {
	    // 此處的Accept()是阻塞的,是呼叫listener.Accept()
		client,err := p.serverTransport.Accept()
		if err != nil {
			select {
			case <-p.quit:
				return nil
			default:
			}
			return err
		}
		if client != nil {
			go func() {
				if err := p.processRequests(client); err != nil {
					log.Println("error processing request:",err)
				}
			}()
		}
	}
}
複製程式碼

如果server此時還在處理請求,服務端突然重啟,thrift 1.0是無法做到優雅重啟的,但是go thrift的最新版本採用了golang waitgroup的方式實現了優雅重啟~

func (p *TSimpleServer) processRequests(client TTransport) error {
	processor := p.processorFactory.GetProcessor(client)

	inputTransport := p.inputTransportFactory.GetTransport(client)
	outputTransport := p.outputTransportFactory.GetTransport(client)

	inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
	outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
	defer func() {
		if e := recover(); e != nil {
			log.Printf("panic in processor: %s: %s",e,debug.Stack())
		}
	}()
	if inputTransport != nil {
		defer inputTransport.Close()
	}
	if outputTransport != nil {
		defer outputTransport.Close()
	}
	for {
		ok,err := processor.Process(inputProtocol,outputProtocol)

		if err,ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
			return nil
		} else if err != nil {
			log.Printf("error processing request: %s",err)
			return err
		}
		if err,ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
			continue
		}
 		if !ok {
			break
		}
	}
	return nil
}
複製程式碼

Process處理邏輯

func (p *EchoProcessor) Process(iprot,oprot thrift.TProtocol) (success bool,err thrift.TException) {
	name,_,seqId,err := iprot.ReadMessageBegin()
	if err != nil {
		return false,err
	}
	// 獲取傳遞過來的name,如果存在則處理
	if processor,ok := p.GetProcessorFunction(name); ok {
		return processor.Process(seqId,iprot,oprot)
	}
	// 異常邏輯
	iprot.Skip(thrift.STRUCT)
	iprot.ReadMessageEnd()
	x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD,"Unknown function "+name)
	oprot.WriteMessageBegin(name,thrift.EXCEPTION,seqId)
	x3.Write(oprot)
	oprot.WriteMessageEnd()
	oprot.Flush()
	return false,x3

}
複製程式碼

TServer接收到rpc請求之後,呼叫TProcessorprocess進行處理。 TProcessorprocess首先呼叫TTransport.readMessageBegin介面,讀出rpc呼叫的名稱和rpc呼叫型別。

如果rpc呼叫型別是rpc call,則呼叫TProcessor.process_fn繼續處理,對於未知的rpc呼叫型別,則丟擲異常。 TProcessor.process_fn根據rpc呼叫名稱,到自己的processMap中查詢對應的rpc處理函式。如果存在對應的rpc處理函式,則呼叫該處理函式繼續進行請求響應。不存在則丟擲異常。

func (p *echoProcessorEcho) Process(seqId int32,err thrift.TException) {
	args := EchoEchoArgs{}
	// 讀取入參的引數
	if err = args.Read(iprot); err != nil {
		iprot.ReadMessageEnd()
		x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR,err.Error())
		oprot.WriteMessageBegin("echo",seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return false,err
	}

	iprot.ReadMessageEnd()

	result := EchoEchoResult{}
	var retval *EchoRes
	var err2 error
	// 此處是thrift為什麼err不能傳錯誤,如果傳業務錯誤會被阻塞
	if retval,err2 = p.handler.Echo(args.Req); err2 != nil {
		x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR,"Internal error processing echo: "+err2.Error())
		oprot.WriteMessageBegin("echo",seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return true,err2
	} else {
		result.Success = retval
	}

	if err2 = oprot.WriteMessageBegin("echo",thrift.REPLY,seqId); err2 != nil {
		err = err2
	}

	if err2 = result.Write(oprot); err == nil && err2 != nil {
		err = err2
	}
	if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
		err = err2
	}

	if err2 = oprot.Flush(); err == nil && err2 != nil {
		err = err2
	}
	if err != nil {
		return
	}
	return true,err
}
複製程式碼

服務端stop程式碼

var once sync.Once
func (p *TSimpleServer) Stop() error {
	q := func() {
		p.quit <- struct{}{}
		p.serverTransport.Interrupt()
	}
	once.Do(q)
	return nil
}
複製程式碼

stop函式比較簡單,可以看出直接向阻塞佇列裡面寫入資料,然後server不再接受請求

客戶端程式碼

Client呼叫的函式

func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes,err error) {

	if err = p.sendEcho(req); err != nil {
		return
	}

	return p.recvEcho()
}
複製程式碼

sendEcho()函式

func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
	oprot := p.OutputProtocol
	if oprot == nil {
		oprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.OutputProtocol = oprot
	}
	// seqid + 1
	p.SeqId++

	if err = oprot.WriteMessageBegin("echo",thrift.CALL,p.SeqId); err != nil {
		return
	}

	// 構建引數
	args := EchoEchoArgs{
		Req: req,}

	if err = args.Write(oprot); err != nil {
		return
	}
	// 通知伺服器傳送完畢
	if err = oprot.WriteMessageEnd(); err != nil {
		return
	}
	return oprot.Flush()
}
複製程式碼

recvEcho()函式

func (p *EchoClient) recvEcho() (value *EchoRes,err error) {
	iprot := p.InputProtocol
	if iprot == nil {
		iprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.InputProtocol = iprot
	}
	//
	method,mTypeId,err := iprot.ReadMessageBegin()
	if err != nil {
		return
	}
	if method != "echo" {
		err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME,"echo failed: wrong method name")
		return
	}
	if p.SeqId != seqId {
		err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID,"echo failed: out of sequence response")
		return
	}
	if mTypeId == thrift.EXCEPTION {
		error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION,"Unknown Exception")
		var error1 error
		error1,err = error0.Read(iprot)
		if err != nil {
			return
		}
		if err = iprot.ReadMessageEnd(); err != nil {
			return
		}
		err = error1
		return
	}
	if mTypeId != thrift.REPLY {
		err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION,"echo failed: invalid message type")
		return
	}
	result := EchoEchoResult{}
	if err = result.Read(iprot); err != nil {
		return
	}
	if err = iprot.ReadMessageEnd(); err != nil {
		return
	}
	value = result.GetSuccess()
	return
}
複製程式碼

thrift在mac機器安裝問題

  • 問題1:go get git.apache.org/thrift.git/lib/go/thrift失敗
  • 問題2:直接使用github.com提供的版本會報未知錯誤

問題2需要根據你的thrift -version來判斷下載哪一個版本的thrift,比如我的thrift版本是0.10.0那麼需要下載的thrift地址為https://github.com/apache/thrift/archive/0.10.0.zip

手動建立mkdir -p git.apache.org/thrift.git/lib/go/目錄,然後將下載後的go檔案移至該目錄下~

Reference