Etcd原始碼分析-網路模型進階篇
起初本篇打算介紹raft相關,但是後來發現,還是有必要再深入介紹一下網路模型。
一、基礎網路模型
紅色框,符合上面經典模型,請求+應答。
藍色框,只有請求,沒有應答。
對於上述藍色框中請求,為什麼沒有應答呢?難道不會超時嗎?
二、訊息流程
2.1 http handler
上一篇介紹到,在使用net/http模組需要使用者自定義http handler(相當於http路由),針對不同http請求,定義不同handler。那麼對於etcd中和peer相關的handler有哪些呢?在檔案rafthttp/transport.go中:
func (t *Transport) Handler() http.Handler { pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) mux := http.NewServeMux() //http 請求路由 mux.Handle(RaftPrefix, pipelineHandler) /* /raft */ mux.Handle(RaftStreamPrefix+"/", streamHandler) /* /raft/stream/ */ mux.Handle(RaftSnapshotPrefix, snapHandler) /* /raft/snapshot */ mux.Handle(ProbingPrefix, probing.NewHandler()) /* /raft/probing */ return mux }
上面所有羅列出的handler並不是所有,只把相關介紹一下,我們只要知道,不同的url會有與之對應handler即可。
2.2 會話建立
對於第一節中,我們以GET /raft/stream/message/8a840eaa4b694be1進行說明,因為這個是最複雜的。2.2.1 報文
首先來看一下,傳送http報文,本端Ip為192.63.63.1,遠端Ip為192.63.63.30名稱 | 含義 |
Host | 服務端ip地址以及埠 |
X-Etcd-Cluster-Id | 叢集id,每個etcd節點都會隨機生成 |
X-Min-Cluster-Version | 叢集要求最低版本 |
X-Peerurls | 告訴對端etcd節點,我(本端)監聽的peer地址是什麼 |
X-Raft-To | 遠端etcd節點id |
X-Server-Form | 本端etcd節點id,用於標識唯一etcd節點。與url後面數字一致 |
2.2.2 會話建立
上面的報文,在哪裡構造出來?在哪裡發出去呢?流程圖如下:rafthttp/http.go,會發現是ServeHTTP方法,這個方法在上一篇已經介紹!
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { w.Header().Set("Allow", "GET") http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return } /* * 忽略部分程式碼,這部分程式碼主要使用構造http頭部資訊 */ /* 這個地方需要注意一下,此處並沒有包把應答報文發出去,但是具體處理邏輯需要參考net/http中Flush */ w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() /* 構造conn物件 */ c := newCloseNotifier() conn := &outgoingConn{ t: t, /* 連線型別 */ Writer: w, /* reponse writer */ Flusher: w.(http.Flusher), /* reponse flusher */ Closer: c, /* 連線close channel物件 */ } p.attachOutgoingConn(conn) /* 會發streamWriter run中connc操作 用於*/ <-c.closeNotify() /* 等待close channel,若一直沒資料可讀則阻塞 */ }
通過attach方法,可知會把conn物件寫到channel cw.connc中,channel另外一端就在run方法中,下面為run的部分程式碼片段:
case conn := <-cw.connc: /* 從channel讀取conn物件,表示會話已經建立 */
cw.mu.Lock()
closed := cw.closeUnlocked()
t = conn.t
switch conn.t { /* 根據StreamType生成對應的解析器 */
case streamTypeMsgAppV2:
enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
case streamTypeMessage:
enc = &messageEncoder{w: conn.Writer}
default:
plog.Panicf("unhandled stream type %s", conn.t)
}
flusher = conn.Flusher /* 用於send訊息 等待接收訊息 */
unflushed = 0
cw.status.activate()
cw.closer = conn.Closer
cw.working = true
cw.mu.Unlock()
if closed {
plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
}
plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
heartbeatc, msgc = tickc.C, cw.msgc //儲存心跳和message的通道
2.3 訊息傳送
發訊息的介面為rafthttp/transport.gotransport.send方法,在介紹raft協議時會介紹如何呼叫此方法,目前只需要知道此方法用於傳送訊息即可。func (t *Transport) Send(msgs []raftpb.Message) {
for _, m := range msgs {
if m.To == 0 {
// ignore intentionally dropped message
continue
}
to := types.ID(m.To) /* 將m.To轉成type.ID格式 */
/* 以to作為key在map中查詢peer物件 */
t.mu.RLock()
p, pok := t.peers[to]
g, rok := t.remotes[to]
t.mu.RUnlock()
//存在peer則不去檢查remote
if pok {
if m.Type == raftpb.MsgApp {
t.ServerStats.SendAppendReq(m.Size())
}
p.send(m) /* 呼叫peer.go (p *peer) send */
continue
}
if rok {
g.send(m)
continue
}
plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
}
}
func (p *peer) send(m raftpb.Message) {
p.mu.Lock()
paused := p.paused
p.mu.Unlock()
if paused {
return
}
// 如果訊息型別是snapshot則返回pipeline,如果是MsgApp則返回msgAppV2Writer,否則返回wirter
// wirtec建立是在
writec, name := p.pick(m)
select {
/* 將訊息寫入channel中
* 接收端的channel位於stream.go streamWriter.run msgc
*/
case writec <- m: //寫入channel
default:
p.r.ReportUnreachable(m.To)
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if p.status.isActive() {
plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full
(bad/overloaded network)", p.id, name)
}
plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
}
}
假設返回的writec為streamWriter型別,則上面writec定義在stream.go func (cw *streamWriter)run() ,到了這裡會發現在2.2.2節中介紹的會話建立流程也是在這個方法中。
傳送訊息具體程式碼如下:
//etcd大部分訊息是通過http協議 此處使用的http通道
case m := <-msgc:
err := enc.encode(&m) /* 格式化訊息,如選舉訊息 */
if err == nil {
unflushed += m.Size()
if len(msgc) == 0 || batched > streamBufSize/2 {/*batched批處理 streamBufSize全域性變數 4096 */
flusher.Flush() /* 重新整理緩衝區,傳送到對端。Flush程式碼為net/http模組 */
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
batched = 0
} else {
batched++
}
continue /* 傳送完成就返回上層 並沒有結束會話 */
}
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close() /* 表示本次收發訊息結束 即http會話結束 */
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
sentFailures.WithLabelValues(cw.peerID.String()).Inc()
上述程式碼,有一個很關鍵的程式碼--continue。這段程式碼並不是像我們之前理解http請求一樣,收到request之後,做處理並且響應一個reponse,最後關閉http會話。然而這裡的做法是,傳送一個訊息後直接continue,並沒有結束會話。難道說就是利用http通道(建立的socket),進行長連線操作嗎?(c/s模式)。後來通過抓包,驗證了我的想法:
發現一些資料在通過2380這埠傳送資料(上圖中tcp資料長度是59位元組),具體內容wireshark無法解析。
至此,傳送流程介紹完畢,下面來看一下接收流程。
2.4 訊息接收
在上一篇其實已經介紹了,接收流程,這裡再深入介紹一下。etcd中有兩個物件:streamReader和streamWriter,通過名字可知,用於讀寫網路流的。上一小節其實操作就是streamWriter,那麼關於接收流程肯定和streamReader相關,流程圖如下:
上一篇介紹到在rafthttp/stream.go中的run方法,cr.dial用於建立http會話(對應上述報文中沒有響應的http請求),cr.decodeLoop迴圈等待對端的訊息,程式碼如下:
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
//根據stream型別,建立不同解碼器
switch t {
case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage:
dec = &messageDecoder{r: rc}
default:
plog.Panicf("unhandled stream type %s", t)
}
select {
case <-cr.stopc:
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
}
return io.EOF
default:
cr.closer = rc
}
cr.mu.Unlock()
//死迴圈 等待訊息
for {
m, err := dec.decode() //阻塞等待訊息
if err != nil {
cr.mu.Lock()
cr.close()
cr.mu.Unlock()
return err
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
cr.mu.Lock()
paused := cr.paused
cr.mu.Unlock()
if paused {
continue
}
if isLinkHeartbeatMessage(&m) {
// raft is not interested in link layer
// heartbeat message, so we should ignore
// it.
continue
}
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m: /* 將訊息寫到channel中 channel另外一段是rafthttp/peer.go startPeer*/
default:
if cr.status.isActive() {
plog.MergeWarningf("dropped internal raft message from %s since receiving
buffer is full (overloaded network)", types.ID(m.From))
}
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
}
}
}
結合流程和相關程式碼,基本上可以梳理清楚。流圖中最後一個方法則進入raft相關處理,對於raft相關內容,後面會有介紹。
三、疑問解答
經過上一節介紹,如下兩個問題就有答案了:為什麼沒有應答?etcd使用http作為通道,說明白點就是使用socket通道,傳輸資料,並沒有完全遵守http協議流程。
難道不會超時嗎?首先反問一句,超時不超時是由誰決定?由客戶端決定!!客戶端在發出請求一段時間內沒有收到響應則認為超時,進行超時處理邏輯。但若客戶端沒有超時處理邏輯呢?那永遠都不會超時,所以超時並不是由協議決定而是由業務邏輯決定。
至此所有關於網路模型相關的內容,介紹到這裡就算完全結束了,下一篇介紹核心重點之一Raft協議。