1. 程式人生 > >ETCD v2 原始碼分析

ETCD v2 原始碼分析

Etcd is a distributed, consistent key-valuestore for shared configurationand service discovery

ETCD 有 v2和 v3,api 和內部儲存都不一樣。這裡只分析 v2。
基於原始碼git tag v2.0.0 :git checkout -b version2 v2.0.0

一致性協議使用 raft,raft 協議不在本文敘述範圍內。raft 協議相關見ETCD - raft

ETCD 整體架構
在這裡插入圖片描述

store 整體結構:
在這裡插入圖片描述
內部 kv 儲存是純記憶體方式,kv 儲存使用 btree結構。

worldLock 用來加鎖,在做操作的時候都會加鎖,將所有操作序列化,但這並不是導致 qps 上不去的原因,實際上影響更大的是 raft 協議的互動。
currentIndex 是當前 raft 的最新 index
watchHub 是客戶端訂閱的 key 資訊,每個 key 對應一個 watcher 列表。同時它還有個 EventHistory,記錄最新的1000個更新。
Root 是 BTree的根節點,從它開始可以找到所有的 key

kv http server 入口

func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

通過

func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, error) 

將 http 引數轉換為請求結構體:

type Request struct {
	ID               uint64 `protobuf:"varint,1,req" json:"ID"`
	Method           string
`protobuf:"bytes,2,req" json:"Method"` Path string `protobuf:"bytes,3,req" json:"Path"` Val string `protobuf:"bytes,4,req" json:"Val"` Dir bool `protobuf:"varint,5,req" json:"Dir"` PrevValue string `protobuf:"bytes,6,req" json:"PrevValue"` PrevIndex uint64
`protobuf:"varint,7,req" json:"PrevIndex"` PrevExist *bool `protobuf:"varint,8,req" json:"PrevExist,omitempty"` Expiration int64 `protobuf:"varint,9,req" json:"Expiration"` Wait bool `protobuf:"varint,10,req" json:"Wait"` Since uint64 `protobuf:"varint,11,req" json:"Since"` Recursive bool `protobuf:"varint,12,req" json:"Recursive"` Sorted bool `protobuf:"varint,13,req" json:"Sorted"` Quorum bool `protobuf:"varint,14,req" json:"Quorum"` Time int64 `protobuf:"varint,15,req" json:"Time"` Stream bool `protobuf:"varint,16,req" json:"Stream"` XXX_unrecognized []byte `json:"-"` }
// Do interprets r and performs an operation on s.store according to r.Method
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
	r.ID = s.reqIDGen.Next()
	if r.Method == "GET" && r.Quorum {
		r.Method = "QGET"
	}
	switch r.Method {
	case "POST", "PUT", "DELETE", "QGET":
		data, err := r.Marshal()
		if err != nil {
			return Response{}, err
		}
		ch := s.w.Register(r.ID)
		s.r.Propose(ctx, data)
		select {
		case x := <-ch:
			resp := x.(Response)
			return resp, resp.err
		case <-ctx.Done():
			s.w.Trigger(r.ID, nil) // GC wait
			return Response{}, parseCtxErr(ctx.Err())
		case <-s.done:
			return Response{}, ErrStopped
		}
	case "GET":
		switch {
		case r.Wait:
			wc, err := s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
			if err != nil {
				return Response{}, err
			}
			return Response{Watcher: wc}, nil
		default:
			ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted)
			if err != nil {
				return Response{}, err
			}
			return Response{Event: ev}, nil
		}
	case "HEAD":
		ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted)
		if err != nil {
			return Response{}, err
		}
		return Response{Event: ev}, nil
	default:
		return Response{}, ErrUnknownMethod
	}
}

可以看到,除了 GET 方法和 HEAD 方法是直接在本機執行外,其它方法都需要其它的 ETCD raft 節點參與,實現方法是在wait.Wait裡面Register一個channel,然後向 raft 叢集提出 Propose 提議,然後等待該channel返回。

Get

func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error)

直接從本 server 的 store 中獲取 value,沒有和其它ETCD 節點互動。如果該節點還沒有跟上 leader 的提交,可能獲取到舊資料。
核心方法是:

// InternalGet gets the node of the given nodePath.
func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
	nodePath = path.Clean(path.Join("/", nodePath))

	walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {

		if !parent.IsDir() {
			err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
			return nil, err
		}

		child, ok := parent.Children[name]
		if ok {
			return child, nil
		}

		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
	}

	f, err := s.walk(nodePath, walkFunc)

	if err != nil {
		return nil, err
	}
	return f, nil
}

因為 key 是以"/"分割的目錄式結構。在 get 的時候,會從根目錄(樹根)開始一級一級往下走,如果某級路徑不存在,則返回錯誤。

watch (Wait/Since)

client 和 server 建立長連線。阻塞等待 server 返回事件通知。
server 內部首先去找到一個跟 key 繫結的 watcher,然後等待在改該 watcher 的 channel 上

EventHistory
ETCD 在EventHistory裡面維護了最近更新的1000個 event。這些 event 根據 index 大小排隊在eventQueue裡面。
一個 watch 過來的時候,會根據 watch 的 sinceIndex(waitIndex),去EventHistory從該 sinceIndex 開始對每個 event 遍歷,匹配 key(如果是recursive則匹配字首,否則完全匹配),如果匹配到了就直接返回。
如果沒有匹配到EventHistory,則在watcherHub中註冊一個該 key 的 watcher。
然後等待 watcher 中的 channel 事件

有兩個細節:

  1. EventHistroy 是有長度限制的,最長1000。也就是說,如果你的客戶端停了許久,然後重新watch的時候,可能和該waitIndex相關的event已經被淘汰了,這種情況下會丟失變更。
  2. 如果通知watch的時候,出現了阻塞(每個watch的channel有100個緩衝空間),Etcd 會直接把watcher刪除,也就是會導致wait請求的連線中斷,客戶端需要重新連線。

quorum, 一致性讀取

quorum=true 的時候,與更新操作一樣,讀取是通過raft叢集進行的,Get 請求會被 leader 廣播到所有的 follower 執行相同的 Get 請求,然後返回 Get 結果。跟 update 操作一樣,如果沒有半數以上的 節點 執行 Get操作,說明有半數以上的節點還沒有該 key 的內容或者該 key 的內容不是最新的(通過 raft 的 index 保證), 該 Get 操作與 update操作一樣會有一個 index,follower 執行該 index 操作的條件是它有前序的所有 index 內容。從而保證了順序性。

一致性讀取的情況下,每次讀取也需要走一次raft協議,能保證一致性,但效能有損失,如果出現網路分割槽,叢集的少數節點是不能提供一致性讀取的。但如果不設定該引數,則是直接從本地的store裡讀取,這樣就損失了一致性。使用的時候需要注意根據應用場景設定這個引數,在一致性和可用性之間進行取捨。

Put

前面說過,Put請求需要在wait.Wait裡面Register一個channel, 向raft 叢集提出 Propose 提議,然後等待該channel返回。當 raft 叢集達到一致,commit 該請求的時候,會呼叫 apply 方法,該方法會呼叫 Wait.Trigger 向 channel傳送處理結果。

// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
	var applied uint64
	var shouldstop bool
	var err error
	for i := range es {
		e := es[i]
		switch e.Type {
		case raftpb.EntryNormal:
			var r pb.Request
			pbutil.MustUnmarshal(&r, e.Data)
			s.w.Trigger(r.ID, s.applyRequest(r))
		case raftpb.EntryConfChange:
			var cc raftpb.ConfChange
			pbutil.MustUnmarshal(&cc, e.Data)
			shouldstop, err = s.applyConfChange(cc, confState)
			s.w.Trigger(cc.ID, err)
		default:
			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
		}
		atomic.StoreUint64(&s.r.index, e.Index)
		atomic.StoreUint64(&s.r.term, e.Term)
		applied = e.Index
	}
	return applied, shouldstop
}

真正執行 apply 的方法:

// applyRequest interprets r as a call to store.X and returns a Response interpreted
// from store.Event
func (s *EtcdServer) applyRequest(r pb.Request) Response {
	f := func(ev *store.Event, err error) Response {
		return Response{Event: ev, err: err}
	}
	expr := timeutil.UnixNanoToTime(r.Expiration)
	switch r.Method {
	case "POST":
		return f(s.store.Create(r.Path, r.Dir, r.Val, true, expr))
	case "PUT":
		exists, existsSet := pbutil.GetBool(r.PrevExist)
		switch {
		case existsSet:
			if exists {
				return f(s.store.Update(r.Path, r.Val, expr))
			}
			return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
		case r.PrevIndex > 0 || r.PrevValue != "":
			return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
		default:
			if storeMemberAttributeRegexp.MatchString(r.Path) {
				id := mustParseMemberIDFromKey(path.Dir(r.Path))
				var attr Attributes
				if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
					log.Panicf("unmarshal %s should never fail: %v", r.Val, err)
				}
				s.Cluster.UpdateAttributes(id, attr)
			}
			return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
		}
	case "DELETE":
		switch {
		case r.PrevIndex > 0 || r.PrevValue != "":
			return f(s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
		default:
			return f(s.store.Delete(r.Path, r.Dir, r.Recursive))
		}
	case "QGET":
		return f(s.store.Get(r.Path, r.Recursive, r.Sorted))
	case "SYNC":
		s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
		return Response{}
	default:
		// This should never be reached, but just in case:
		return Response{err: ErrUnknownMethod}
	}
}

對於 Put 操作,如果沒有指定 PrevExist 和 PrevIndex這些,則會直接讓 store 執行更新:

// Set creates or replace the node at nodePath.
func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
	var err error

	s.worldLock.Lock()
	defer s.worldLock.Unlock()

	defer func() {
		if err == nil {
			s.Stats.Inc(SetSuccess)
		} else {
			s.Stats.Inc(SetFail)
		}
	}()

	// Get prevNode value
	n, getErr := s.internalGet(nodePath)
	if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
		err = getErr
		return nil, err
	}

	// Set new value
	e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
	if err != nil {
		return nil, err
	}
	e.EtcdIndex = s.CurrentIndex

	// Put prevNode into event
	if getErr == nil {
		prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
		prev.Node.loadInternalNode(n, false, false, s.clock)
		e.PrevNode = prev.Node
	}

	s.WatcherHub.notify(e)

	return e, nil
}

可以看到執行更新的時候使用了 worldLock 這把大鎖,所有更新操作都會序列執行。
建立節點:

func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
	expireTime time.Time, action string) (*Event, error) {

	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1

	if unique { // append unique item under the node path
		nodePath += "/" + strconv.FormatUint(nextIndex, 10)
	}

	nodePath = path.Clean(path.Join("/", nodePath))

	// we do not allow the user to change "/"
	if nodePath == "/" {
		return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
	}

	// Assume expire times that are way in the past are
	// This can occur when the time is serialized to JS
	if expireTime.Before(minExpireTime) {
		expireTime = Permanent
	}

	dirName, nodeName := path.Split(nodePath)

	// walk through the nodePath, create dirs and get the last directory node
	d, err := s.walk(dirName, s.checkDir)

	if err != nil {
		s.Stats.Inc(SetFail)
		err.Index = currIndex
		return nil, err
	}

	e := newEvent(action, nodePath, nextIndex, nextIndex)
	eNode := e.Node

	n, _ := d.GetChild(nodeName)

	// force will try to replace a existing file
	if n != nil {
		if replace {
			if n.IsDir() {
				return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
			}
			e.PrevNode = n.Repr(false, false, s.clock)

			n.Remove(false, false, nil)
		} else {
			return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
		}
	}

	if !dir { // create file
		// copy the value for safety
		valueCopy := value
		eNode.Value = &valueCopy

		n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)

	} else { // create directory
		eNode.Dir = true

		n = newDir(s, nodePath, nextIndex, d, "", expireTime)
	}

	// we are sure d is a directory and does not have the children with name n.Name
	d.Add(n)

	// node with TTL
	if !n.IsPermanent() {
		s.ttlKeyHeap.push(n)

		eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
	}

	s.CurrentIndex = nextIndex

	return e, nil
}

更新完成後,需要通知訂閱該 key 變更的 watcher 列表,ECTD 會從該 key 的第一級目錄開始遍歷watcherHub,通知所有的 watcher 有變更發生,向該 watcher 的 eventChan 傳送變更的 Event:

// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
	e = wh.EventHistory.addEvent(e) // add event into the eventHistory

	segments := strings.Split(e.Node.Key, "/")

	currPath := "/"

	// walk through all the segments of the path and notify the watchers
	// if the path is "/foo/bar", it will notify watchers with path "/",
	// "/foo" and "/foo/bar"

	for _, segment := range segments {
		currPath = path.Join(currPath, segment)
		// notify the watchers who interests in the changes of current path
		wh.notifyWatchers(e, currPath, false)
	}
}

TTL

key的過期以及續約機制是服務發現的基礎,服務提供者在註冊服務的時候可以在某個目錄下建立自己的節點,然後設定 TTL,也就是開始一個租約。然後定期去重新整理這個 TTL 續約,這相當於心跳。服務呼叫者從 ETCD recursive Get 該目錄就能獲得所有的下游節點資訊,它也可以通過 Watch 機智來獲取下游節點變更的提醒。

從internalCreate看到,在建立(或更新)的最後,如果該 key 設定了 ttl,則會執行s.ttlKeyHeap.push(n)
該node 被掛到一個最小堆裡面,該堆使用 node.ExpireTime 排序。因此堆頂的節點就是最先過期的節點,容易找到過期的節點並執行刪除操作。

CAS(Compare-and-Swap)

可以用於分散式鎖以及leader選舉。
主要通過 PrevExist/PrevIndex/PrevValue 實現
如果指定了以上引數,會先 get 一下看看原來的值是否符合條件,符合條件再操作。
見 store.CompareAndSwap

POST 自增 key

Atomically Creating In-Order Keys
Using POST on a directory, you can create keys with key names that are created in-order.

{
    "action": "create",
    "node": {
        "createdIndex"
            
           

相關推薦

ETCD v2 原始碼分析

Etcd is a distributed, consistent key-valuestore for shared configurationand service discovery ETCD 有 v2和 v3,api 和內部儲存都不一樣。這裡只分析 v

Hadoop2原始碼分析-MapReduce v2架構

1.概述   前面我們已經對Hadoop有了一個初步認識,接下來我們開始學習Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天為大家分享的是mapreduce部分,其內容目錄如下所示: MapReduce V1 M

YOLO v2 損失函式原始碼分析

損失函式的定義是在region_layer.c檔案中,關於region層使用的引數在cfg檔案的最後一個section中定義。 首先來看一看region_layer 都定義了那些屬性值: layer make_region_layer(int batch, int w, int h, int n,

【原創】k8s原始碼分析------第三方庫etcd client分析

程式碼為github.com/coreos/go-etcd/etcd 注: 此版本為k8s v1.1.1 中所使用的etcd client。 首先我看下k8s中是如何使用的。位置在

Etcd原始碼分析-網路模型

從本篇開始,將會有一個系列對Etcd原始碼進行分析。我之前閱讀過很多開源軟體,對閱讀開源軟體,有如下基本思路:1、瞭解該軟體相關背景知識,例如相關部落格、官網,要相信自己不是第一個分析該軟體的人2、對該軟體進行使用,例如:編譯、執行或者基於介面進行開發3、找到該軟體的合適切入

Etcd原始碼分析-網路模型進階篇

起初本篇打算介紹raft相關,但是後來發現,還是有必要再深入介紹一下網路模型。一、基礎網路模型        Etcd採用http(https)協議作為應用層協議,關於http協議介紹不是本篇範疇。大家都知道http一般情況下是無狀態協議,且網路是位請求+應答,當收到應答ht

mybatis原理,配置介紹及原始碼分析

前言 mybatis核心元件有哪些?它是工作原理是什麼? mybatis配置檔案各個引數是什麼含義? mybatis只添加了介面類,沒有實現類,為什麼可以直接查詢呢? mybatis的mapper對映檔案各個引數又是什麼含義? mybatis-spring提供哪些機制簡化了原生mybatis? m

Spark原始碼分析之Spark Shell(上)

https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec

Android與JS之JsBridge使用與原始碼分析

在Android開發中,由於Native開發的成本較高,H5頁面的開發更靈活,修改成本更低,因此前端網頁JavaScript(下面簡稱JS)與Java之間的互相呼叫越來越常見。 JsBridge就是一個簡化Android與JS通訊的框架,原始碼:https://github.com/lzyzsd

Flink on Yarn模式啟動流程原始碼分析

此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從原始碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。 --> 1.命令列啟動yarn sessi

菜鳥帶你看原始碼——看不懂你打我ArrayList原始碼分析(基於java 8)

文章目錄 看原始碼並不難 軟體環境 成員變數: 構造方法 核心方法 get方法 remove方法 add方法 結束 看原始碼並不難 如何學好程式設計?如何寫出優質的程式碼?如

區塊鏈教程Fabric1.0原始碼分析flogging(Fabric日誌系統)

  區塊鏈教程Fabric1.0原始碼分析flogging(Fabric日誌系統),2018年下半年,區塊鏈行業正逐漸褪去發展之初的浮躁、迴歸理性,表面上看相關人才需求與身價似乎正在回落。但事實上,正是初期泡沫的漸退,讓人們更多的關注點放在了區塊鏈真正的技術之上。 Fabric 1.0原始碼筆記 之 flo

區塊鏈教程Fabric1.0原始碼分析流言演算法Gossip服務端二

  區塊鏈教程Fabric1.0原始碼分析流言演算法Gossip服務端二 Fabric 1.0原始碼筆記 之 gossip(流言演算法) #GossipServer(Gossip服務端) 5.2、commImpl結構體方法 //conn.serviceConnection(),啟動連線服務 func (

區塊鏈教程Fabric1.0原始碼分析流言演算法Gossip服務端一

  區塊鏈教程Fabric1.0原始碼分析流言演算法Gossip服務端一,2018年下半年,區塊鏈行業正逐漸褪去發展之初的浮躁、迴歸理性,表面上看相關人才需求與身價似乎正在回落。但事實上,正是初期泡沫的漸退,讓人們更多的關注點放在了區塊鏈真正的技術之上。 Fabric 1.0原始碼筆記 之 gossip(流

PyTorch--雙向遞迴神經網路(B-RNN)概念,原始碼分析

  關於概念:   BRNN連線兩個相反的隱藏層到同一個輸出.基於生成性深度學習,輸出層能夠同時的從前向和後向接收資訊.該架構是1997年被Schuster和Paliwal提出的.引入BRNNS是為了增加網路所用的輸入資訊量.例如,多層感知機(MLPS)和延時神經網路(TDNNS)在輸入資料的靈活性方面是非

Android ADB 原始碼分析(三)

前言 之前分析的兩篇文章 Android Adb 原始碼分析(一) 嵌入式Linux:Android root破解原理(二)   寫完之後,都沒有寫到相關的實現程式碼,這篇文章寫下ADB的通訊流程的一些細節 看這篇文章之前,請先閱讀 Linux的SOCKET

Android Adb 原始碼分析

扭起屁股得意洋洋 最近,我負責的專案因為臨近量產,把之前的userdebug版本關閉,轉成了user版本,增加selinux的許可權,大家都洋溢在專案準備量產的興奮和喜悅之中不能自拔 誰知,好景不長,user版本釋出之後,各種bug接踵而來,但是因為user版本許可權的原因,我們之前保留

【Spring Boot】(29)、SpringBoot整合Mybatis原始碼分析

在【Spring Boot】(23)、Spring Boot整合Mybatis的章節中講述了SpringBoot整合Mybatis的過程,以及一些配置說明,這節主要講解一下整合的原始碼。 廢話不多說,直接進入今天的主題。 閱讀過我之前寫的文章的童靴,肯定知道SpringBoot整合第三方

【轉載】C++ 智慧指標(shared_ptr/weak_ptr)原始碼分析

發現一篇對C++11智慧指標分析很透徹的文章,特轉載備忘! 以下轉載自:https://blog.csdn.net/ithiker/article/details/51532484?utm_source=blogxgwz1   C++11目前已經引入了unique_ptr, shared_pt

x265原始碼分析:sao.cpp 自適應樣點補償

/* 對|num / den|四捨五入,然後前面新增符號 */ inline int32_t roundIBDI(int32_t num, int32_t den) { return num >= 0 ? ((num * 2 + den)/(den * 2)) : -((-