1. 程式人生 > >zookeeper演算法研究

zookeeper演算法研究

Paxos分散式一致性演算法 Paxos是一個基於訊息傳遞的一致性演算法,近幾年被廣泛應用於分散式計算中,Google的Chubby,Apache的Zookeeper都是基於它的理論來實現的,Paxos還被認為是到目前為止唯一的分散式一致性演算法,其它的演算法都是Paxos的改進或簡化。Paxos只有在一個可信的計算環境中才能成立,這個環境是不會被入侵所破壞的。 Paxos描述了這樣一個場景,有一個叫做Paxos的小島(Island)上面住了一批居民,島上面所有的事情由一些特殊的人決定,他們叫做議員(Senator)。議員的總數(Senator Count)是確定的,不能更改。島上每次環境事務的變更都需要通過一個提議(Proposal),每個提議都有一個編號(PID),這個編號是一直增長的,不能倒退。每個提議都需要超過半數((Senator Count)/2 +1)的議員同意才能生效。每個議員只會同意大於當前編號的提議,包括已生效的和未生效的。如果議員收到小於等於當前編號的提議,他會拒絕,並告知對方:你的提議已經有人提過了。這裡的當前編號是每個議員在自己記事本上面記錄的編號,他不斷更新這個編號。整個議會不能保證所有議員記事本上的編號總是相同的。現在議會有一個目標:保證所有的議員對於提議都能達成一致的看法。
現在議會開始運作,所有議員一開始記事本上面記錄的編號都是0。有一個議員發了一個提議:將電費設定為1元/度。他首先看了一下記事本,嗯,當前提議編號是0,那麼我的這個提議的編號就是1,於是他給所有議員發訊息:1號提議,設定電費1元/度。其他議員收到訊息以後查了一下記事本,哦,當前提議編號是0,這個提議可接受,於是他記錄下這個提議並回復:我接受你的1號提議,同時他在記事本上記錄:當前提議編號為1。發起提議的議員收到了超過半數的回覆,立即給所有人發通知:1號提議生效!收到的議員會修改他的記事本,將1好提議由記錄改成正式的法令,當有人問他電費為多少時,他會檢視法令並告訴對方:1元/度。 現在看衝突的解決
:假設總共有三個議員S1-S3,S1和S2同時發起了一個提議:1號提議,設定電費。S1想設為1元/度, S2想設為2元/度。結果S3先收到了S1的提議,於是他做了和前面同樣的操作。緊接著他又收到了S2的提議,結果他一查記事本,咦,這個提議的編號小於等於我的當前編號1,於是他拒絕了這個提議:對不起,這個提議先前提過了。於是S2的提議被拒絕,S1正式釋出了提議: 1號提議生效。S2向S1或者S3打聽並更新了1號法令的內容,然後他可以選擇繼續發起2號提議。
現在讓我們來對號入座,看看在ZK Server裡面Paxos是如何得以貫徹實施的。 小島(Island)——ZK Server Cluster
議員(Senator)——ZK Server 提議(Proposal)——ZNode Change(Create/Delete/SetData…) 提議編號(PID)——Zxid(ZooKeeper Transaction Id) 正式法令——所有ZNode及其資料 在所有議員中設立一個總統,只有總統有權發出提議,如果議員有自己的提議,必須發給總統並由總統來提出。 總統——ZK Server Leader 現在我們假設總統已經選好了,下面看看ZK Server是怎麼實施的。 情況一: 屁民甲(Client)到某個議員(ZK Server)那裡詢問(Get)某條法令的情況(ZNode的資料),議員毫不猶豫的拿出他的記事本(local storage),查閱法令並告訴他結果,同時宣告:我的資料不一定是最新的。你想要最新的資料?沒問題,等著,等我找總統Sync一下再告訴你。 情況二: 屁民乙(Client)到某個議員(ZK Server)那裡要求政府歸還欠他的一萬元錢,議員讓他在辦公室等著,自己將問題反映給了總統,總統詢問所有議員的意見,多數議員表示欠屁民的錢一定要還,於是總統發表宣告,從國庫中拿出一萬元還債,國庫總資產由100萬變成99萬。屁民乙拿到錢回去了(Client函式返回)。 情況三: 總統突然掛了,議員接二連三的發現聯絡不上總統,於是各自發表宣告,推選新的總統,總統大選期間政府停業,拒絕屁民的請求。
Zookeeper中的FastLeaderElection演算法 我們知道,在經典的paxos演算法中每一個peer都是proposer,但是這就不可避免的產生提案衝突,為了減少這種衝突帶來的系統消耗與時間延遲,就產生了Leader這個角色,整個系統中,就只允許Leader可以發出提案。ZooKeeper就是按照這個思路來實現的。本文主要討論ZooKeeper中的FastLeaderElection演算法,來講解Leader是如何產生的。 Zookeeper中的每一個peer在絕大多數情況下是共用同一個配置檔案的,所以每一個peer得到的其他peer的資訊(編號、ip地址、選舉埠號)是一樣的。Zookeeper系統啟動之後,在為客戶提供服務之前,是必須要產生一個Leader的。所以就必須先要執行選舉Leader演算法。 在每一個peer的選舉演算法開始之後,預設採用FastLeaderElection演算法,每一個peer一方面向所有的peer傳送一張自己的選票,同時另一方面它也接受其他peer傳送過來的選票,然後不停的統計票數。當然在此之前,每一個peer都會建立一個Listener來監聽自己的埠號,以此來接受所有peer傳送過來的選票。從這一個過程中,實際上是每一個peer都是coordinator。這樣每一個peer都是coordinator話,系統就會產生n*n個網路連線,無疑消耗了大量的系統資源。但是ZooKeeper從中也是做過一些優化工作的。 首先,每一個節點在傳送選票之前都會判斷這張選票是否是傳送給自己的,如果是則直接存入統計選票的集合中,而不會通過網路連線傳送給自己。 其次,每一個peer節點只接受編號比自己大的peer的網路連線請求,這樣產生的網路連線數就是(n-1)*(n-1)/2。 另外,這個Leader的選舉過程中是不會發生活鎖的。因為在每一輪投票之後,每一個peer都會產生一張新選票,而每一個peer產生一張新選票的決策規則(peer資料版本號+編號)是一樣的,同時這個決策規則依賴的節點資料更新程度和節點編號在任何兩個peer之間是不可能重複的。 最後,我還想再補充一點的就是統計票數問題。ZooKeeper採用了一個高度抽象的表決器(QuorumVerifier)。例如,這個表決器可以僅僅只依賴票數,也可以依賴(票數+選票權重),等...決策策略。
Zookeeper核心機制 Zookeeper是Hadoop下的一個子專案,它是一個針對大型分散式系統的可靠的協調系統,提供的功能包括命名服務、配置維護、分散式同步、叢集服務等。 Zookeeper是可以叢集復制的,叢集間通過Zab(Zookeeper Atomic Broadcast)協議來保持資料的一致性。 該協議包括2個階段:leader election階段和Actomic broadcast階段。叢集中將選舉出一個leader,其他的機器則稱為follower,所有的寫操作都被傳送給leader,並通過broadcast將所有的更新告訴follower。當leader崩潰或者leader失去大多數的follower時,需要重新選舉出一個新的leader,讓所有的伺服器都恢復到一個正確的狀態。當leader被選舉出來,且大多數伺服器完成了和leader的狀態同步後,leader election的過程就結束了,將進入Atomic broadcast的過程。Actomic broadcast同步leader和follower之間的資訊,保證leader和follower具備相同的系統狀態。 Zookeeper叢集的結構圖如下: 路由和負載均衡的實現 當服務越來越多,規模越來越大時,對應的機器數量也越來越龐大,單靠人工來管理和維護服務及地址的配置資訊,已經越來越困難。並且,依賴單一的硬體負載均衡裝置或者使用LVS、Nginx等軟體方案進行路由和負載均衡排程,單點故障的問題也開始凸顯,一旦服務路由或者負載均衡伺服器宕機,依賴其的所有服務均將失效。如果採用雙機高可用的部署方案,使用一臺伺服器“stand by”,能部分解決問題,但是鑑於負載均衡裝置的昂貴成本,已難以全面推廣。 一旦伺服器與ZooKeeper集群斷開連線,節點也就不存在了,通過註冊相應的watcher,服務消費者能夠第一時間獲知服務提供者機器資訊的變更。利用其znode的特點和watcher機制,將其作為動態註冊和獲取服務資訊的配置中心,統一管理服務名稱和其對應的伺服器列表資訊,我們能夠近乎實時地感知到後端伺服器的狀態(上線、下線、宕機)。Zookeeper叢集間通過Zab協議,服務配置資訊能夠保持一致,而Zookeeper本身容錯特性和leader選舉機制,能保證我們方便地進行擴容。 Zookeeper中,服務提供者在啟動時,將其提供的服務名稱、伺服器地址、以節點的形式註冊到服務配置中心,服務消費者通過服務配置中心來獲得需要呼叫的服務名稱節點下的機器列表節點。通過前面所介紹的負載均衡演算法,選取其中一臺伺服器進行呼叫。當伺服器宕機或者下線時,由於znode非持久的特性,相應的機器可以動態地從服務配置中心裡面移除,並觸發服務消費者的watcher。在這個過程中,服務消費者只有在第一次呼叫服務時需要查詢服務配置中心,然後將查詢到的服務資訊快取到本地,後面的呼叫直接使用本地快取的服務地址列表資訊,而不需要重新發起請求到服務配置中心去獲取相應的服務地址列表,直到服務的地址列表有變更(機器上線或者下線),變更行為會觸發服務消費者註冊的相應的watcher進行服務地址的重新查詢。這種無中心化的結構,使得服務消費者在服務資訊沒有變更時,幾乎不依賴配置中心,解決了之前負載均衡裝置所導致的單點故障的問題,並且大大降低了服務配置中心的壓力。 通過Zookeeper來實現服務動態註冊、機器上線與下線的動態感知,擴容方便,容錯性好,且無中心化結構能夠解決之前使用負載均衡裝置所帶來的單點故障問題。只有當配置資訊更新時服務消費者才會去Zookeeper上獲取最新的服務地址列表,其他時候使用本地快取即可,這樣服務消費者在服務資訊沒有變更時,幾乎不依賴配置中心,能大大降低配置中心的壓力。
分析一下這個演算法不難發現,如果有3臺伺服器啟動,第一個向zookeeper“報告”的人會被當選為leader;如果它出現故障,第二個向zookeeper“報告”的人會被當選為leader,以此類推。這是一種非常原始的民主選舉制度,有一個象徵最高權力的“神器”,得到“神器”的就是大部落的酋長;很多人想要參選大酋長,那麼誰跑得快最先搶到“神器”誰就是大酋長;如果在後面的“執政”期間酋長因為“太墮落”被幹掉了那麼第二名自動接管“神器”變成大酋長。把上面的程式碼執行兩次,最先執行的程式會被選擇為leader;殺死第一個程序,第二個程序的控制檯會輸出自己當選為leader的資訊。(第二個程序不是立即輸出資訊,需要等待幾秒鐘)
有了zookeeper的一致性檔案系統,鎖的問題變得容易。鎖服務可以分為兩類,一個是保持獨佔,另一個是控制時序。 對於第一類,我們將zookeeper上的一個znode看作是一把鎖,通過createznode的方式來實現。所有客戶端都去建立 /distribute_lock 節點,最終成功建立的那個客戶端也即擁有了這把鎖。廁所有言:來也沖沖,去也沖沖,用完刪除掉自己建立的distribute_lock 節點就釋放出鎖。 對於第二類, /distribute_lock 已經預先存在,所有客戶端在它下面建立臨時順序編號目錄節點,和選master一樣,編號最小的獲得鎖,用完刪除,依次方便。
兩種型別的佇列: 1、同步佇列,當一個佇列的成員都聚齊時,這個佇列才可用,否則一直等待所有成員到達。 2、佇列按照 FIFO 方式進行入隊和出隊操作。 第一類,在約定目錄下建立臨時目錄節點,監聽節點數目是否是我們要求的數目。 第二類,和分散式鎖服務中的控制時序場景基本原理一致,入列有編號,出列按編號。 終於瞭解完我們能用zookeeper做什麼了,可是作為一個程式設計師,我們總是想狂熱瞭解zookeeper是如何做到這一點的,單點維護一個檔案系統沒有什麼難度,可是如果是一個叢集維護一個檔案系統保持資料的一致性就非常困難了。
對zookeeper來說,它採用的方式是寫任意。通過增加機器,它的讀吞吐能力和響應能力擴充套件性非常好,而寫,隨著機器的增多吞吐能力肯定下降(這也是它建立observer的原因),而響應能力則取決於具體實現方式,是延遲複製保持最終一致性,還是立即複製快速響應。
如何在zookeeper叢集中選舉出一個leader,zookeeper使用了三種演算法,具體使用哪種演算法,在配置檔案中是可以配置的,對應的配置項是”electionAlg”,其中1對應的是LeaderElection演算法,2對應的是AuthFastLeaderElection演算法,3對應的是FastLeaderElection演算法.預設使用FastLeaderElection演算法.其他兩種演算法我沒有研究過,就不多說了. 要理解這個演算法,最好需要一些paxos演算法的理論基礎. 1) 資料恢復階段 首先,每個在zookeeper伺服器先讀取當前儲存在磁碟的資料,zookeeper中的每份資料,都有一個對應的id值,這個值是依次遞增的,換言之,越新的資料,對應的ID值就越大. 2) 首次傳送自己的投票值 在讀取資料完畢之後,每個zookeeper伺服器傳送自己選舉的leader,這個協議中包含了以下幾部分的資料: 1)所選舉leader的id(就是配置檔案中寫好的每個伺服器的id) ,在初始階段,每臺伺服器的這個值都是自己伺服器的id,也就是它們都選舉自己為leader. 2)伺服器最大資料的id,這個值大的伺服器,說明存放了更新的資料. 3)邏輯時鐘的值,這個值從0開始遞增,每次選舉對應一個值,也就是說:如果在同一次選舉中,那麼這個值應該是一致的 2)邏輯時鐘值越大,說明這一次選舉leader的程序更新. 4)本機在當前選舉過程中的狀態,有以下幾種:LOOKING,FOLLOWING,OBSERVING,LEADING,顧名思義不必解釋了吧. 每臺伺服器將自己伺服器的以上資料傳送到叢集中的其他伺服器之後,同樣的也需要接收來自其他伺服器的資料,它將做以下的處理: 1) 如果所接收資料伺服器的狀態還是在選舉階段(LOOKING 狀態),那麼首先判斷邏輯時鐘值,又分為以下三種情況: a) 如果傳送過來的邏輯時鐘大於目前的邏輯時鐘,那麼說明這是更新的一次選舉,此時需要更新一下本機的邏輯時鐘值,同時將之前收集到的來自其他伺服器的選舉清空,因為這些資料已經不再有效了.然後判斷是否需要更新當前自己的選舉情況.在這裡是根據選舉leader id,儲存的最大資料id來進行判斷的,這兩種資料之間對這個選舉結果的影響的權重關係是:首先看資料id,資料id大者勝出;其次再判斷leader id,leader id大者勝出.然後再將自身最新的選舉結果(也就是上面提到的三種資料廣播給其他伺服器). 程式碼如下:     if (n.epoch > logicalclock) {     logicalclock = n.epoch;      recvset.clear();      if(totalOrderPredicate(n.leader, n.zxid,getInitId(), getInitLastLoggedZxid()))         updateProposal(n.leader, n.zxid);      else      updateProposal(getInitId(),getInitLastLoggedZxid());      sendNotifications();  其中的totalOrderPredicate函式就是根據傳送過來的封包中的leader id,資料id來與本機儲存的相應資料進行判斷的函式,返回true說明需要更新資料,於是呼叫updateProposal函式更新資料 b) 傳送過來資料的邏輯時鐘小於本機的邏輯時鐘 說明對方在一個相對較早的選舉程序中,這裡只需要將本機的資料傳送過去就是了 c) 兩邊的邏輯時鐘相同,此時也只是呼叫totalOrderPredicate函式判斷是否需要更新本機的資料,如果更新了再將自己最新的選舉結果廣播出去就是了. 實際上,在處理選票之前,還有一個預處理的動作,它發生在剛剛接收到關於vote的message的時候,具體過程如下:     1.判斷message的來源是不是observer,如果是,則告訴該observer我當前認為的Leader的資訊,否則進入2      2.判斷message是不是vote資訊,是則進入3      3.根據message建立一張vote      4.如果當前server處理LOOKING狀態,將vote放入自己的投票箱,而且如果vote源server處於LOOKING狀態同時vote源server選舉時舊的,則當前server通知它新的一輪投票;      5如果當前server不處於LOOKING狀態而vote源server處理LOOKING狀態,則當前server告訴它當前的Leader資訊。 三種情況的處理完畢之後,再處理兩種情況: 1)伺服器判斷是不是已經收集到了所有伺服器的選舉狀態,如果是那麼根據選舉結果設定自己的角色(FOLLOWING還是LEADER),然後退出選舉過程就是了. 2)即使沒有收集到所有伺服器的選舉狀態,也可以判斷一下根據以上過程之後最新的選舉leader是不是得到了超過半數以上伺服器的支援,如果是,那麼嘗試在200ms內接收一下資料,如果沒有新的資料到來,說明大家都已經默認了這個結果,同樣也設定角色退出選舉過程. 程式碼如下:     /*     * Only proceed if the vote comes from a replica in the     * voting view.     */      if(self.getVotingView().containsKey(n.sid)){      recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));      //If have received from all nodes, then terminate      if((self.getVotingView().size() == recvset.size()) && (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){      self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());      leaveInstance();      return new Vote(proposedLeader, proposedZxid);      } else if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock))) {      // Verify if there is any change in the proposed leader      while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){      if(totalOrderPredicate(n.leader, n.zxid,proposedLeader, proposedZxid)){         recvqueue.put(n);         break;      }     }      /*     * This predicate is true once we don't read any new     * relevant message from the reception queue     */      if (n == null) {      self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());      if(LOG.isDebugEnabled()){      LOG.debug("About to leave FLE instance: Leader= " + proposedLeader + ", Zxid = " + proposedZxid + ", My id = " + self.getId() + ", My state = " + self.getPeerState());      }      leaveInstance();      return new Vote(proposedLeader,proposedZxid);      }      }      }  2) 如果所接收伺服器不在選舉狀態,也就是在FOLLOWING或者LEADING狀態 做以下兩個判斷: a) 如果邏輯時鐘相同,將該資料儲存到recvset,如果所接收伺服器宣稱自己是leader,那麼將判斷是不是有半數以上的伺服器選舉它,如果是則設定選舉狀態退出選舉過程 b) 否則這是一條與當前邏輯時鐘不符合的訊息,那麼說明在另一個選舉過程中已經有了選舉結果,於是將該選舉結果加入到outofelection集合中,再根據outofelection來判斷是否可以結束選舉,如果可以也是儲存邏輯時鐘,設定選舉狀態,退出選舉過程. 程式碼如下:     if(n.epoch == logicalclock){      recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));      if((n.state == ServerState.LEADING) || (termPredicate(recvset, new Vote(n.leader,n.zxid, n.epoch, n.state))&& checkLeader(outofelection, n.leader, n.epoch)) ){      self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());      leaveInstance();      return new Vote(n.leader, n.zxid);      }      }      outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));      if(termPredicate(outofelection, new Vote(n.leader,n.zxid, n.epoch, n.state))&& checkLeader(outofelection, n.leader, n.epoch)) {         synchronized(this){            logicalclock = n.epoch;            self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState());         }         leaveInstance();         return new Vote(n.leader, n.zxid);      }      break;      }      }  以一個簡單的例子來說明整個選舉的過程. 假設有五臺伺服器組成的zookeeper叢集,它們的id從1-5,同時它們都是最新啟動的,也就是沒有歷史資料,在存放資料量這一點上,都是一樣的.假設這些伺服器依序啟動,來看看會發生什麼.     1) 伺服器1啟動,此時只有它一臺伺服器啟動了,它發出去的報沒有任何響應,所以它的選舉狀態一直是LOOKING狀態      2) 伺服器2啟動,它與最開始啟動的伺服器1進行通訊,互相交換自己的選舉結果,由於兩者都沒有歷史資料,所以id值較大的伺服器2勝出,但是由於沒有達到超過半數以上的伺服器都同意選舉它(這個例子中的半數以上是3),所以伺服器1,2還是繼續保持LOOKING狀態.      3) 伺服器3啟動,根據前面的理論分析,伺服器3成為伺服器1,2,3中的老大,而與上面不同的是,此時有三臺伺服器選舉了它,所以它成為了這次選舉的leader.      4) 伺服器4啟動,根據前面的分析,理論上伺服器4應該是伺服器1,2,3,4中最大的,但是由於前面已經有半數以上的伺服器選舉了伺服器3,所以它只能接收當小弟的命了.      5) 伺服器5啟動,同4一樣,當小弟.
叢集只有3臺機器,所以server.1和server.2啟動後,即可選舉出Leader。後續Leader和Follower開始資料互動。 1.server啟動時預設選舉自己,並向整個叢集廣播 2.收到訊息時,通過3層判斷:選舉輪數,zxid,server id大小判斷是否同意對方,如果同意,則修改自己的選票,並向叢集廣播 3.QuorumCnxManager負責IO處理,每2個server建立一個連線,只允許id大的server連id小的server,每個server啟動單獨的讀寫執行緒處理,使用阻塞IO 4.預設超過半數機器同意時,則選舉成功,修改自身狀態為LEADING或FOLLOWING 5.Obserer機器不參與選舉
在分散式應用, 往往存在多個程序提供同一服務. 這些程序有可能在相同的機器上, 也有可能分佈在不同的機器上. 如果這些程序共享了一些資源, 可能就需要分散式鎖來鎖定對這些資源的訪問. 程序需要訪問共享資料時, 就在"/locks"節點下建立一個sequence型別的子節點, 稱為thisPath. 當thisPath在所有子節點中最小時, 說明該程序獲得了鎖. 程序獲得鎖之後, 就可以訪問共享資源了. 訪問完成後, 需要將thisPath刪除. 鎖由新的最小的子節點獲得. 有了清晰的思路之後, 還需要補充一些細節. 程序如何知道thisPath是所有子節點中最小的呢? 可以在建立的時候, 通過getChildren方法獲取子節點列表, 然後在列表中找到排名比thisPath前1位的節點, 稱為waitPath, 然後在waitPath上註冊監聽, 當waitPath被刪除後, 程序獲得通知, 此時說明該程序獲得了鎖. 上述的方案並不安全. 假設某個client在獲得鎖之前掛掉了, 由於client建立的節點是ephemeral型別的, 因此這個節點也會被刪除, 從而導致排在這個client之後的client提前獲得了鎖. 此時會存在多個client同時訪問共享資源. 如何解決這個問題呢? 可以在接到waitPath的刪除通知的時候, 進行一次確認, 確認當前的thisPath是否真的是列表中最小的節點.