1. 程式人生 > >Zookeeper內部分析

Zookeeper內部分析

   導語:分析Zookeeper內部如何做到分散式資料一致性,將從系統模型、序列化與協議、客戶端工作原理、會話、服務端工作原理及資料儲存來分析Zookeeper技術底層實現。

系統模型:

首先從資料模型,節點特性、版本、WatherACL特性來分析Zookeeper的系統模型。

資料模型:

Zookeeper使得分散式程式能夠通過一個共享的、樹形結構的名字空間來進行相互協調。它採用了類似檔案系統的目錄樹型結構的資料模型,我們稱為ZNode節點,ZNodeZookeeper中資料最小的單元,每個節點都可以儲存資料同時也可以掛載子節點。因此構成層次化的名稱空間,類似於節點數

    ZNode:

    zookeeper名字空間由節點znode構成,其組織方式類似檔案系統,其中各個節點相當於目錄和檔案,通過路徑作為唯一標識。

   

事務ID

在傳統資料庫中事務具有所謂的ACID特性:即原子性、一致性、隔離性、和永續性。

   在Zookeeper中,事務是指能夠改變Zookeeper伺服器狀態的操作,稱為事務操作或者更新操作。對於每一個事務的請求,Zookeeper都會為其分配一個全域性唯一的事務ID(ZXID),通常是一個64位的數字。每一個ZXID對應一次更新操作,Zookeeper根據這些全域性唯一的ZXID請求來處理更新請求的全域性順序。

節點特性:

節點型別:

       ZooKeeper

節點是有生命週期的,這取決於節點的型別。在ZooKeeper中,節點型別可以分為持久節點(PERSISTENT)、臨時節點(EPHEMERAL),以及時序節點(SEQUENTIAL),具體在節點建立過程中,一般是組合使用,可以生成以下4種節點型別。

持久節點(PERSISTENT

所謂持久節點,是指在節點建立後,就一直存在,直到有刪除操作來主動清除這個節點——不會因為建立該節點的客戶端會話失效而消失

持久順序節點(PERSISTENT_SEQUENTIAL

ZK中,每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點建立的先後順序。基於這個特性,在建立子節點的時候,可以設定這個屬性,那麼在建立節點過程中,ZK

會自動為給定節點名加上一個數字字尾,作為新的節點名。這個數字字尾的範圍是整型的最大值。

臨時節點(EPHEMERAL

和持久節點不同的是,臨時節點的生命週期和客戶端會話繫結。也就是說,如果客戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裡提到的是會話失效,而非連線斷開。另外,在臨時節點下面不能建立子節點,注意是更具Session會話的失效時間來設定的。

臨時順序節點(EPHEMERAL_SEQUENTIAL

臨時順序節點的特性和臨時節點一致,同時是在臨時節點的基礎上,添加了順序的特性。

  狀態資訊:

Zookeeper維護資料節點的同時,每個節點除了儲存資料內容之外,還儲存了資料節點本身一些狀態資訊,如圖所示:

     

第一行是當前節點的資料內容,第二行開始就是節點的狀態資訊,Zookeeper客戶端Stat類的資料結構:

     

    ZooKeeper中每個znodeStat結構體由下述欄位構成:

lczxid:建立節點的事務的zxid

lmzxid:對znode最近修改的zxid

lctime:以距離時間原點(epoch)的毫秒數表示的znode建立時間

lmtime:以距離時間原點(epoch)的毫秒數表示的znode最近修改時間

lversionznode資料的修改次數

lcversionznode子節點修改次數

laversionznodeACL修改次數

lephemeralOwner:如果znode是臨時節點,則指示節點所有者的會話ID;如果不是臨時節點,則為零。

ldataLengthznode資料長度。

lnumChildrenznode子節點個數。

  版本:

用於保證分散式資料原子性操作。

      Zookeeper會為每個Znode維護一個叫作Stat的資料結構,結構如圖:

版本型別

說明

version

當前資料節點資料內容的版本號

cversion

當前資料節點子節點的版本號

aversion

當前資料節點ACL變更版本號

version是表示對資料節點資料內容的變更次數,強調的是變更次數,因此就算資料內容的值沒有發生變化,version的值也會遞增。

     在介紹version時,我們可以簡單的瞭解在資料庫技術中,通常提到的悲觀鎖樂觀鎖

悲觀鎖:具有嚴格的獨佔和排他特性,能偶有效的避免不同事務在同一資料併發更新而造成的資料一致性問題。實現原理就是:假設A事務正在對資料進行處理,那麼在整個處理過程中,都會將資料處於鎖定的狀態,在這期間,其他事務將無法對這個資料進行更新操作,直到事務A完成對該資料的處理,釋放對應的鎖。一份資料只會分配一把鑰匙,如資料庫的表鎖或者行鎖(for update).

樂觀鎖:具體實現是,表中有一個版本欄位,第一次讀的時候,獲取到這個欄位。處理完業務邏輯開始更新的時候,需要再次檢視該欄位的值是否和第一次的一樣。如果一樣更新,反之拒絕。

     Zookeeper的版本作用就是類似於樂觀鎖機制,用於實現樂觀鎖機制的寫入校驗

     在Zookeeper的內部實現,Zookeeper通過鏈式的processor來處理業務請求,每個processor負責處理特定的功能。不同的Zookeeper角色的伺服器processor鏈是不一樣的,在PrepRequestProcessor請求處理鏈中,在處理資料更新的時候會去檢查版本。

   PrepRequestProcessor

     

Watcher-資料變更通知:

      

    ZookeeperWatcher機制主要包括客戶端執行緒、客戶端WatchManagerZookeeper伺服器三部分。在具體的流程上,客戶端向Zookeeper伺服器註冊Watcher事件監聽的同時,會將Watcher物件儲存在 客戶端WatchManager中。當Zookeeper伺服器觸發Watcher事件後,會向客戶端傳送通知,客戶端執行緒從WatchManager中取出對應的Watcher物件執行回撥邏輯。

客戶端註冊Watcher原始碼剖析

   在開始下面的講解之前,先了解一下幾個概念:

   Watcher

Watcher介面類用於表示一個標準的事件處理器,定義事件通知的相關邏輯,包含KeeperStateEventType兩個列舉,分別代表通知狀態和事件型別,同時定義事件的回撥方法:process方法;

  WatcherEvent

包含每一事件的三種基本屬性:通知狀態、事件型別、節點路徑。Zookeeper使用Watcher物件來封裝伺服器端事件,並傳遞給Watcher,從而方便回撥方法process對伺服器事件進行處理。


  Packet

PacketZookeeper中用來通訊的最小單元,所以任何需要網路進行傳輸的物件都需要包裝成Packet物件。

  SendThead

  SendThreadZookeeper中專門用來接收事件通知的執行緒,當服務端響應了客戶端的請求後,會交給SendThread處理。

  EventThread

   EventThreadZookeeper專門用來處理事件通知的執行緒,SendThread在接收到通知事件後會將事件傳給EventThread進行處理。

  ServerCnxn

  ServerCnxn代表的是一個客戶端和服務端的連線,客戶端像服務端註冊Watcher的時候,並不會真正將Watcher物件傳遞到服務端,而服務端也僅僅是儲存了與當前客戶端連線的ServerCnxn物件。預設實現是NIOServerCNXN.3.4.0版本開始引用Netty的實現:NettyServerCnxn.

  WatcherManager:

Zookeeper服務端Watcher的管理者,其內部管理的WatchTableWatch2Paths兩個儲存結構,兩個緯度對Watcher進行管理:

        1watchTable是資料節點路徑粒度來託管Watch

         2watch2Paths是從Watcher的粒度來控制事件觸發需要觸發的資料節點

   Watcher介面

同一個事件的型別在不同的通知狀態中代表的含義有所不同:


     NodeDataChanged事件:此處的變更包括資料節點內容和資料的版本號DateVersion。因此,對於Zookeeper來說,無論資料內容是否更改,

還是會觸發這個事件的通知,一旦客戶端呼叫了資料更新介面,且更新成功,就會更新dataversion值。

      nodeChildrenCahnged事件會在資料節點的子節點列表發生 變更的時候被觸發,這裡說的子節點列表變化特指子節點個數和組成情況的變更,如新增和刪除,而子節點內容的變化是不會觸發這個事件的。

     AuthFailed這個事件觸發的條件並不是客戶端會話沒有許可權,而是授權失敗,就是使用了錯誤的schema進行授權。

    process方法是Watcher介面中的一個回撥方法,當ZooKeeper向客戶端傳送一個Watcher事件通知時,客戶端就會對相應的process方法進行回撥,從而實現對事件的處理。

abstractpublicvoidprocess(WatchedEvent event);

process方法包含WatcherEvent型別的引數,WatchedEvent包含了每一個事件的三個基本屬性:通知狀態(KeeperState)、事件型別(EventType)和節點路徑(Path),ZooKeeper使用WatchedEvent物件來封裝服務端事件並傳遞給Watcher,從而方便回撥方法process對服務端事件進行處理。

WatchedEventWatcherEvent都表示的是同一個事物,都是對一個服務端事件的封裝。不同的是,WatchedEvent是一個邏輯事件,用於服務端和客戶端程式執行過程中所需的邏輯物件,而WatcherEvent因為實現了序列化介面,因此可以用於網路傳輸。

服務端線上程WatchedEvent事件之後,會呼叫getWrapper方法將自己包裝成一個可序列化的WatcherEvent事件,以便通過網路傳輸到客戶端。客戶端在接收到服務端的這個事件物件後,首先會將WatcherEvent事件還原成一個WatchedEvent事件,並傳遞給process方法處理,回撥方法process根據入參就能夠解析出完整的服務端事件了。

注:Zookeeper Watcher的一個重要特性:客戶端無法直接從WatchedEvent事件中獲取到對應資料節點的原始資料內容,以及變更後的資料內容,而是客戶端再次主動去重新獲取資料。

Zookeeper Watcher工作機制:

客戶端註冊Watcher :


這個Watcher將作為整個ZooKeeper會話期間的預設Watcher,會一直被儲存在客戶端ZKWatchManagerdefaultWatcher裡面。另外Zookeeper客戶端也可以通過getDatagetChildrenExist三個介面向Zookeeper伺服器註冊Watcher.

在向getData介面註冊Watcher後,客戶端首先會向客戶端請求request進行標記,將其設定為“使用watch監聽”,同時會封裝一個Watcher的註冊資訊DataWatchRegistration物件,用於暫時儲存資料節點的路徑和Watcher的對應關係;

publicbyte[] getData(final String path, Watcher watcher, Stat stat)

        throws KeeperException, InterruptedException

     {

        final String clientPath = path;

        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path

        WatchRegistration wcb = null;

        if (watcher != null) {

            wcb = newDataWatchRegistration(watcher, clientPath);

        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();

        h.setType(ZooDefs.OpCode.getData);

        GetDataRequest request = new GetDataRequest();

        request.setPath(serverPath);

        request.setWatch(watcher != null);

        GetDataResponse response = new GetDataResponse();

        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

        if (r.getErr() != 0) {

            throw KeeperException.create(KeeperException.Code.get(r.getErr()),

                    clientPath);

        }

        if (stat != null) {

            DataTree.copyStat(response.getStat(), stat);

        }

        return response.getData();
    }
 


ZooKeeper中,Packet是一個最小的通訊協議單元,即資料包。Pakcet用於進行客戶端與服務端之間的網路傳輸,任何需要傳輸的物件都需要包裝成一個Packet物件。在ClientCnxnWatchRegistration也會被封裝到Pakcet中,然後由SendThread執行緒呼叫queuePacket方法把Packet放入傳送佇列中等待客戶端傳送,這又是一個非同步過程,分散式系統採用非同步通訊是一個普遍認同的觀念。

public ReplyHeader submitRequest(RequestHeader h, Record request,

            Record response, WatchRegistration watchRegistration)

            throws InterruptedException {

        ReplyHeader r = new ReplyHeader();

        Packet packet = queuePacket(h, r, request, response, null, null, null,

                    null, watchRegistration);

        synchronized (packet) {

            while (!packet.finished) {

                packet.wait();

            }

        }

        return r;
    }

  Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,

            Record response, AsyncCallback cb, String clientPath,

            String serverPath, Object ctx, WatchRegistration watchRegistration)

    {

        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is

        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),

        // where the packet is actually sent.

        synchronized (outgoingQueue) {

            packet = new Packet(h, r, request, response, watchRegistration);

            packet.cb = cb;

            packet.ctx = ctx;

            packet.clientPath = clientPath;

            packet.serverPath = serverPath;

            if (!state.isAlive() || closing) {

                conLossPacket(packet);

            } else {

                // If the client is asking to close the session then

                // mark as closing

                if (h.getType() == OpCode.closeSession) {

                    closing = true;

                }

                outgoingQueue.add(packet);

            }

        }

        sendThread.getClientCnxnSocket().wakeupCnxn();

        return packet;
    }
  
<span style="font-size:18px;">ClientCnXn ----->   sendThread = new SendThread(clientCnxnSocket); 傳送</span>

   隨後、Zookeeper客戶端就會向服務端傳送請求,同時等待請求的返回,完成請求傳送後,會有客戶端SendThread執行緒的readResponse方法接收來自服務端的響應,非同步地呼叫finishPacket方法從Packet中取出對應的Watcher並註冊到ZKWatchManager中去:

void readResponse(ByteBuffer incomingBuffer) throws IOException {

           ............

            } finally {

                finishPacket(packet);

            }

 }

privatevoid finishPacket(Packet p) {

        if (p.watchRegistration != null) {

            p.watchRegistration.register(p.replyHeader.getErr());

        }

        if (p.cb == null) {

           .........

        } else {

            p.finished = true;

            eventThread.queuePacket(p);

        }

   }

     從上面內容中,我們可以瞭解到客戶端已經將Watcher暫時封裝在watchRegistration物件中,現在需要從這個封裝物件再次提取出Watcher來:

 abstractprotected Map<String, Set<Watcher>> getWatches(int rc);

        /**

         * Register the watcher with the set of watches on path.

         * @param rc the result code of the operation that attempted to

         * add the watch on the path.

         */

        publicvoidregister(int rc) {

            if (shouldAddWatch(rc)) {

                Map<String, Set<Watcher>> watches = getWatches(rc);

                synchronized(watches) {

                    Set<Watcher> watchers = watches.get(clientPath);

                    if (watchers == null) {

                        watchers = new HashSet<Watcher>();

                        watches.put(clientPath, watchers);

                    }

                    watchers.add(watcher);

                }

            }

        }

register方法中,客戶端會將之前暫時儲存的Watcher物件轉交給ZKWatchManager,並最終儲存到dataWatches中,ZKWatchManager. dataWatches是一個Map<String, Set<Watcher>>型別的資料結構,用於將資料節點的路徑與Watcher物件對映後管理起來。

   整個客戶端Watcher註冊流程:

  

問題:

 1、客戶端每呼叫一次getData()介面,就會註冊上一次Watcher,那麼這些Watcher實體都會被客戶端請求傳送值服務端嗎?

答:並不是;如果所有的Watcher都傳遞個服務端,那麼服務端肯定會出現記憶體緊張或者其他效能問題,那麼ZookeeperwatchRegistration封裝到了Packet物件中去,在通過SendThread傳送出去,事實上,在底層網路傳輸序列化過程中,並沒有將watchRegistration物件完全的序列化到底層位元組陣列中,:

Packet:

publicvoid createBB() {

            try {

                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);

                boa.writeInt(-1, "len"); // We'll fill this in later

                if (requestHeader != null) {

                    requestHeader.serialize(boa, "header");

                }

                if (request instanceof ConnectRequest) {

                    request.serialize(boa, "connect");

                    // append "am-I-allowed-to-be-readonly" flag

                    boa.writeBool(readOnly, "readOnly");

                } else if (request != null) {

                    request.serialize(boa, "request");

                }

                baos.close();

                this.bb = ByteBuffer.wrap(baos.toByteArray());

                this.bb.putInt(this.bb.capacity() - 4);

                this.bb.rewind();

            } catch (IOException e) {

                LOG.warn("Ignoring unexpected exception", e);

            }
        }

    只是將requestHeaderrequest兩個屬性進行序列化傳輸。

服務端處理Watcher:

當服務端收到了客戶端的請求後,如果客戶端標記了需要使用Watcher監聽,服務端會觸發相應的事件,整個主幹流程很簡單,可以簡單理解為下圖的方式:

   

      FinalRequestProcessor類接收到客戶端請求後,會呼叫processRequest方法進行處理中會判斷當前請求是否需要註冊Watcher:

case OpCode.getData: {

                lastOp = "GETD";

                GetDataRequest getDataRequest = new GetDataRequest();

                ByteBufferInputStream.byteBuffer2Record(request.request,

                        getDataRequest);

                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());

                if (n == null) {

                    thrownew KeeperException.NoNodeException();

                }

                Long aclL;

                synchronized(n) {

                    aclL = n.acl;

                }

                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),

                        ZooDefs.Perms.READ,

                        request.authInfo);

                Stat stat = new Stat();

                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,

                        getDataRequest.getWatch() ? cnxn : null);

                rsp = new GetDataResponse(b, stat);

                break;
            }

     對於註冊Watcher請求,FinalRequestProcessorProcessRequest方法會判斷當前請求是否需要註冊Watcher,如果為true,就會將當前的ServerCnxn物件和資料節點路徑傳入getData方法中去。ServerCnxn是一個ZooKeeper客戶端和伺服器之間的連線介面,代表了一個客戶端和伺服器的連線,我們後面講到的process回撥方法,實際上也是從這裡回撥的,所以可以把ServerCnxn看作是一個Watcher物件。資料節點的節點路徑和ServerCnxn最終會被儲存在WatchManagerwatchTablewatch2Paths中。在服務端,DataTree會託管兩個WatchManager、分別是dataWatcheschildWatches

        


     對服務端Watcher的觸發:當發生CreateDeleteNodeChange(資料變更)這樣的事件後,DataTree會呼叫相應方法去觸發WatchManagertriggerWatch方法,該方法返回ZNODE的資訊,自此進入到回撥本地process的序列。

public ProcessTxnResult processTxn(TxnHeader header, Record txn)

    {

        ProcessTxnResult rc = new ProcessTxnResult();