Golang標準庫RPC實踐及改進
阿新 • • 發佈:2019-02-14
轉載自:http://daizuozhuo.github.io/golang-rpc-practice/
一直用Golang標準庫裡的的RPC package來進行遠端呼叫,簡單好用. 但是隨著任務數量的增大, 發現簡單的像包裡面的示例那樣的程式碼出現了各種各樣的問題,下面就把我踩過的一些坑記錄一下吧. 首先是最初使用的文件裡的版本,使用HTTP來發送請求.
server.go
func ListenRPC() {
rpc.Register(NewWorker())
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":4200")
if e != nil {
log.Fatal("Error: listen 4200 error:", e)
}
go http.Serve(l, nil)
}
client.go
func call(srv string, rpcname string, args interface{}, reply interface{}) error {
c, errx := rpc.DialHTTP("tcp", srv+":4200")
if errx != nil {
return fmt.Errorf("ConnectError: %s" , errx.Error())
}
defer c.Close()
return c.Call(rpcname, args, reply)
}
這樣四五臺機器的情況是夠用了, 但是後來叢集的機器增加到了十二臺, 當請求大了之後發現總有很多工卡住,通過call函式傳送任務之後總會有沒有返回的情況. 於是轉而直接用tcp,效率有很大提升.
server.go
func ListenRPC() {
rpc.Register(NewWorker())
l, e := net.Listen("tcp", ":4200")
if e != nil {
log.Fatal("Error: listen 4200 error:" , e)
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Print("Error: accept rpc connection", err.Error())
continue
}
go rpc.ServeConn(conn)
}
}()
}
client.go
func call(srv string, rpcname string, args interface{}, reply interface{}) error {
c, errx := rpc.Dial("tcp", srv+":4200")
if errx != nil {
return fmt.Errorf("ConnectError: %s", errx.Error())
}
defer c.Close()
return c.Call(rpcname, args, reply)
}
這樣局面有所改觀,但是還是有任務卡住,概率大概是0.01%, 也就是一萬個call裡會有一個沒有響應. 仔細研究後發現這個rpc package有兩大坑:
rpc包裡的rpc.Dial函式沒有timeout, 系統預設是沒有timeout的,所以在這裡可能卡住.所以我們可以採用net包裡的 net.DialTimeout函式.
rpc包裡預設使用gobCodec來編碼解碼, 這裡io可能會卡住而不返回錯誤,所以我們要自己編寫加入timeout的codec. 注意server這邊讀寫都有timeout,但是client這邊只有寫有timeout,因為讀的話並不能預知任務完成的時間. 於是就有了接下來這個版本的rpc,幾十萬個任務下來沒有任何問題.
完整的程式碼可以在在github rpc-example上下載.
server.go
func TimeoutCoder(f func(interface{}) error, e interface{}, msg string) error {
echan := make(chan error, 1)
go func() { echan <- f(e) }()
select {
case e := <-echan:
return e
case <-time.After(time.Minute):
return fmt.Errorf("Timeout %s", msg)
}
}
type gobServerCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
closed bool
}
func (c *gobServerCodec) ReadRequestHeader(r *rpc.Request) error {
return TimeoutCoder(c.dec.Decode, r, "server read request header")
}
func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
return TimeoutCoder(c.dec.Decode, body, "server read request body")
}
func (c *gobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
if err = TimeoutCoder(c.enc.Encode, r, "server write response"); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding response:", err)
c.Close()
}
return
}
if err = TimeoutCoder(c.enc.Encode, body, "server write response body"); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding body:", err)
c.Close()
}
return
}
return c.encBuf.Flush()
}
func (c *gobServerCodec) Close() error {
if c.closed {
// Only call c.rwc.Close once; otherwise the semantics are undefined.
return nil
}
c.closed = true
return c.rwc.Close()
}
func ListenRPC() {
rpc.Register(NewWorker())
l, e := net.Listen("tcp", ":4200")
if e != nil {
log.Fatal("Error: listen 4200 error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Print("Error: accept rpc connection", err.Error())
continue
}
go func(conn net.Conn) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
err = rpc.ServeRequest(srv)
if err != nil {
log.Print("Error: server rpc request", err.Error())
}
srv.Close()
}(conn)
}
}()
}
client.go
type gobClientCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
}
func (c *gobClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
if err = TimeoutCoder(c.enc.Encode, r, "client write request"); err != nil {
return
}
if err = TimeoutCoder(c.enc.Encode, body, "client write request body"); err != nil {
return
}
return c.encBuf.Flush()
}
func (c *gobClientCodec) ReadResponseHeader(r *rpc.Response) error {
return c.dec.Decode(r)
}
func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *gobClientCodec) Close() error {
return c.rwc.Close()
}
func call(srv string, rpcname string, args interface{}, reply interface{}) error {
conn, err := net.DialTimeout("tcp", srv+":4200", time.Second*10)
if err != nil {
return fmt.Errorf("ConnectError: %s", err.Error())
}
encBuf := bufio.NewWriter(conn)
codec := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
c := rpc.NewClientWithCodec(codec)
err = c.Call(rpcname, args, reply)
errc := c.Close()
if err != nil && errc != nil {
return fmt.Errorf("%s %s", err, errc)
}
if err != nil {
return err
} else {
return errc
}
}