1. 程式人生 > >Etcd原始碼分析-網路模型進階篇

Etcd原始碼分析-網路模型進階篇

起初本篇打算介紹raft相關,但是後來發現,還是有必要再深入介紹一下網路模型。
一、基礎網路模型

        Etcd採用http(https)協議作為應用層協議,關於http協議介紹不是本篇範疇。大家都知道http一般情況下是無狀態協議,且網路是位請求+應答,當收到應答http session就結束了。但是在etcd中可能就不是這樣子了。下面是抓取的http報文:

紅色框,符合上面經典模型,請求+應答。
藍色框,只有請求,沒有應答。
對於上述藍色框中請求,為什麼沒有應答呢?難道不會超時嗎?

二、訊息流程

2.1 http handler

      上一篇介紹到,在使用net/http模組需要使用者自定義http handler(相當於http路由),針對不同http請求,定義不同handler。那麼對於etcd中和peer相關的handler有哪些呢?
        在檔案rafthttp/transport.go中:
func (t *Transport) Handler() http.Handler {
	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
	mux := http.NewServeMux() //http 請求路由
	mux.Handle(RaftPrefix, pipelineHandler) /* /raft */
	mux.Handle(RaftStreamPrefix+"/", streamHandler)  /* /raft/stream/ */
	mux.Handle(RaftSnapshotPrefix, snapHandler)      /* /raft/snapshot */
	mux.Handle(ProbingPrefix, probing.NewHandler())  /* /raft/probing */
	return mux
}

上面所有羅列出的handler並不是所有,只把相關介紹一下,我們只要知道,不同的url會有與之對應handler即可。

2.2 會話建立

    對於第一節中,我們以GET /raft/stream/message/8a840eaa4b694be1進行說明,因為這個是最複雜的。

2.2.1 報文

首先來看一下,傳送http報文,本端Ip為192.63.63.1,遠端Ip為192.63.63.30

名稱

含義

Host

服務端ip地址以及埠

X-Etcd-Cluster-Id

叢集id,每個etcd節點都會隨機生成

X-Min-Cluster-Version

叢集要求最低版本

X-Peerurls

告訴對端etcd節點,我(本端)監聽的peer地址是什麼

X-Raft-To

遠端etcd節點id

X-Server-Form

本端etcd節點id,用於標識唯一etcd節點。與url後面數字一致

2.2.2 會話建立

上面的報文,在哪裡構造出來?在哪裡發出去呢?流程圖如下:

rafthttp/http.go,會發現是ServeHTTP方法,這個方法在上一篇已經介紹!

func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != "GET" {
		w.Header().Set("Allow", "GET")
		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
		return
	}
/*
* 忽略部分程式碼,這部分程式碼主要使用構造http頭部資訊
*/

/* 這個地方需要注意一下,此處並沒有包把應答報文發出去,但是具體處理邏輯需要參考net/http中Flush */
	w.WriteHeader(http.StatusOK)
	w.(http.Flusher).Flush()

/* 構造conn物件 */
	c := newCloseNotifier()
	conn := &outgoingConn{
		t:       t,   /* 連線型別 */
		Writer:  w,   /* reponse writer */
		Flusher: w.(http.Flusher),  /* reponse flusher */
		Closer:  c,  /* 連線close channel物件 */
	}
	p.attachOutgoingConn(conn) /* 會發streamWriter run中connc操作 用於*/
	<-c.closeNotify() /* 等待close channel,若一直沒資料可讀則阻塞 */
}

        通過attach方法,可知會把conn物件寫到channel cw.connc中,channel另外一端就在run方法中,下面為run的部分程式碼片段:

case conn := <-cw.connc: /* 從channel讀取conn物件,表示會話已經建立 */
	cw.mu.Lock()
	closed := cw.closeUnlocked()
	t = conn.t
	switch conn.t { /* 根據StreamType生成對應的解析器 */
	    case streamTypeMsgAppV2:
		enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
	    case streamTypeMessage:
		enc = &messageEncoder{w: conn.Writer}
	    default:
		plog.Panicf("unhandled stream type %s", conn.t)
	}
	flusher = conn.Flusher /* 用於send訊息 等待接收訊息 */
	unflushed = 0
	cw.status.activate()
	cw.closer = conn.Closer
	cw.working = true
	cw.mu.Unlock()

	if closed {
		plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
	}
	plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
	heartbeatc, msgc = tickc.C, cw.msgc //儲存心跳和message的通道

2.3 訊息傳送

        發訊息的介面為rafthttp/transport.gotransport.send方法,在介紹raft協議時會介紹如何呼叫此方法,目前只需要知道此方法用於傳送訊息即可。
func (t *Transport) Send(msgs []raftpb.Message) {
	for _, m := range msgs {
		if m.To == 0 {
			// ignore intentionally dropped message
			continue
		}
		to := types.ID(m.To) /* 將m.To轉成type.ID格式 */
/* 以to作為key在map中查詢peer物件 */
		t.mu.RLock()
		p, pok := t.peers[to]
		g, rok := t.remotes[to]
		t.mu.RUnlock()

		//存在peer則不去檢查remote
		if pok {
			if m.Type == raftpb.MsgApp {
				t.ServerStats.SendAppendReq(m.Size())
			}
			p.send(m)  /* 呼叫peer.go  (p *peer) send */
			continue
		}

		if rok {
			g.send(m)
			continue
		}

		plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
	}
}

func (p *peer) send(m raftpb.Message) {
	p.mu.Lock()
	paused := p.paused
	p.mu.Unlock()

	if paused {
		return
	}
	// 如果訊息型別是snapshot則返回pipeline,如果是MsgApp則返回msgAppV2Writer,否則返回wirter
	// wirtec建立是在
	writec, name := p.pick(m) 
	select {
    /* 將訊息寫入channel中 
* 接收端的channel位於stream.go streamWriter.run msgc 
*/
	case writec <- m: //寫入channel
	default:
		p.r.ReportUnreachable(m.To)
		if isMsgSnap(m) {
			p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
		}
		if p.status.isActive() {
			plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full 
                    (bad/overloaded network)", p.id, name)
		}
		plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
	}
}

    假設返回的writec為streamWriter型別,則上面writec定義在stream.go func (cw *streamWriter)run() ,到了這裡會發現在2.2.2節中介紹的會話建立流程也是在這個方法中。
    傳送訊息具體程式碼如下:
   //etcd大部分訊息是通過http協議 此處使用的http通道   

case m := <-msgc:
	err := enc.encode(&m) /* 格式化訊息,如選舉訊息 */
	if err == nil {
	    unflushed += m.Size()
            if len(msgc) == 0 || batched > streamBufSize/2 {/*batched批處理 streamBufSize全域性變數 4096 */
		flusher.Flush() /* 重新整理緩衝區,傳送到對端。Flush程式碼為net/http模組 */
		sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
		unflushed = 0
		batched = 0
	    } else {
		batched++
	    }
	    continue  /* 傳送完成就返回上層 並沒有結束會話 */
	}

	cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
	cw.close() /* 表示本次收發訊息結束 即http會話結束 */
	plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
	heartbeatc, msgc = nil, nil
	cw.r.ReportUnreachable(m.To)
	sentFailures.WithLabelValues(cw.peerID.String()).Inc()

        上述程式碼,有一個很關鍵的程式碼--continue。這段程式碼並不是像我們之前理解http請求一樣,收到request之後,做處理並且響應一個reponse,最後關閉http會話。然而這裡的做法是,傳送一個訊息後直接continue,並沒有結束會話。難道說就是利用http通道(建立的socket),進行長連線操作嗎?(c/s模式)。後來通過抓包,驗證了我的想法:


發現一些資料在通過2380這埠傳送資料(上圖中tcp資料長度是59位元組),具體內容wireshark無法解析。
至此,傳送流程介紹完畢,下面來看一下接收流程。

2.4 訊息接收

        在上一篇其實已經介紹了,接收流程,這裡再深入介紹一下。etcd中有兩個物件:streamReader和streamWriter,通過名字可知,用於讀寫網路流的。上一小節其實操作就是streamWriter,那麼關於接收流程肯定和streamReader相關,流程圖如下:


上一篇介紹到在rafthttp/stream.go中的run方法,cr.dial用於建立http會話(對應上述報文中沒有響應的http請求),cr.decodeLoop迴圈等待對端的訊息,程式碼如下:

func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
	var dec decoder
	cr.mu.Lock()
	//根據stream型別,建立不同解碼器
	switch t {
	case streamTypeMsgAppV2:
		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
	case streamTypeMessage:
		dec = &messageDecoder{r: rc}
	default:
		plog.Panicf("unhandled stream type %s", t)
	}
	select {
	case <-cr.stopc:
		cr.mu.Unlock()
		if err := rc.Close(); err != nil {
			return err
		}
		return io.EOF
	default:
		cr.closer = rc
	}
	cr.mu.Unlock()
  //死迴圈 等待訊息
	for {
		m, err := dec.decode() //阻塞等待訊息
		if err != nil {
			cr.mu.Lock()
			cr.close()
			cr.mu.Unlock()
			return err
		}
		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
		cr.mu.Lock()
		paused := cr.paused
		cr.mu.Unlock()
		if paused {
			continue
		}
		if isLinkHeartbeatMessage(&m) {
			// raft is not interested in link layer
			// heartbeat message, so we should ignore
			// it.
			continue
		}
		recvc := cr.recvc
		if m.Type == raftpb.MsgProp {
			recvc = cr.propc
		}
		select {
		case recvc <- m: /* 將訊息寫到channel中 channel另外一段是rafthttp/peer.go startPeer*/
		default:
			if cr.status.isActive() {
				plog.MergeWarningf("dropped internal raft message from %s since receiving 
                                                        buffer is full (overloaded network)", types.ID(m.From))
			}
			plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
		}
	}
}

結合流程和相關程式碼,基本上可以梳理清楚。流圖中最後一個方法則進入raft相關處理,對於raft相關內容,後面會有介紹。

三、疑問解答

        經過上一節介紹,如下兩個問題就有答案了:
為什麼沒有應答?etcd使用http作為通道,說明白點就是使用socket通道,傳輸資料,並沒有完全遵守http協議流程。
難道不會超時嗎?首先反問一句,超時不超時是由誰決定?由客戶端決定!!客戶端在發出請求一段時間內沒有收到響應則認為超時,進行超時處理邏輯。但若客戶端沒有超時處理邏輯呢?那永遠都不會超時,所以超時並不是由協議決定而是由業務邏輯決定。
        至此所有關於網路模型相關的內容,介紹到這裡就算完全結束了,下一篇介紹核心重點之一Raft協議。