1. 程式人生 > >zookeeper基礎知識整理

zookeeper基礎知識整理

Zookeeper應用簡單例子

Zookeeper能幫我們作什麼事情呢?簡單的例子:假設我們我們有個20個搜尋引擎的伺服器(每個負責總索引中的一部分的搜尋任務)和一個總伺服器(負責向這20個搜尋引擎的伺服器發出搜尋請求併合並結果集),一個備用的總伺服器(負責當總伺服器宕機時替換總伺服器),一個web的cgi(向總伺服器發出搜尋請求).搜尋引擎的伺服器中的15個伺服器現在提供搜尋服務,5個伺服器正在生成索引.這20個搜尋引擎的伺服器經常要讓正在提供搜尋服務的伺服器停止提供服務開始生成索引,或生成索引的伺服器已經把索引生成完成可以搜尋提供服務了.使用Zookeeper可以保證總伺服器自動感知有多少提供搜尋引擎的伺服器並向這些伺服器發出搜尋請求,備用的總伺服器宕機時自動啟用備用的總伺服器,web的cgi能夠自動地獲知總伺服器的網路地址變化.這些又如何做到呢?

  1.  提供搜尋引擎的伺服器都在Zookeeper中建立znode,zk.create("/search/nodes/node1",
    "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL);
  2. 總伺服器可以從Zookeeper中獲取一個znode的子節點的列表,zk.getChildren("/search/nodes",true);
  3. 總伺服器遍歷這些子節點,並獲取子節點的資料生成提供搜尋引擎的伺服器列表.
  4. 當總伺服器接收到子節點改變的事件資訊,重新返回第二步.
  5. 總伺服器在Zookeeper中建立節點,zk.create("/search/master","hostname".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL);
  6. 備用的總伺服器監控Zookeeper中的"/search/master"節點.當這個znode的節點資料改變時,把自己啟動變成總伺服器,並把自己的網路地址資料放進這個節點.
  7. web的cgi從Zookeeper中"/search/master"節點獲取總伺服器的網路地址資料並向其傳送搜尋請求.
  8. web的cgi監控Zookeeper中的"/search/master"節點,當這個znode的節點資料改變時,從這個節點獲取總伺服器的網路地址資料,並改變當前的總伺服器的網路地址.

Zookeeper簡單整理(《Hadoop in action》,也可以參考《Hadoop權威指南》裡的zookeeper篇)

當一個ZooKeeper例項被建立之後, 它啟動一個執行緒連線到ZooKeeper伺服器, 對建構函式的響應返回的很快, 因此在使用ZooKeeper物件錢等待建立連線非常重要, 因此藉助併發包中的CountDownLatch來阻塞, 直到ZooKeeper例項準備好.

客戶端連線到ZooKeeper之後, Watcher的process()方法被呼叫, 並收到一個事件, 表明連線已經完成. 在收到連線事件時(由Watcher.Event.KeeperState列舉型表示, 並帶有值SyncConnected), CountDownLatch的countDown()方法被呼叫, 計數減一, 即釋放等待執行緒, 表明連線建立, 可以自行其他操作了. 比如建立znode.

ZooKeeper可以提供一種高可用, 高效能的協作服務.

ZooKeeper是為協作而設計的(通常使用小資料檔案), 不是大容量的資料儲存, 因此任何一個znode的資料儲存量的上限是1MB

ZooKeeper上的資料訪問都是原子的. 不可能出現部分資料被客戶端寫入, ZooKeeper不支援追加操作.

znode的路徑必須是絕對, 因此, 他們必須由反斜槓字元開頭. 除此之外, 他們還必須是唯一的.

zookeeper在路徑中是保留字, /zookeeper用來儲存管理資訊, 比如一些配額資訊.

znode可以分為兩種: 臨時的和永久的. znode型別是在建立時指定的, 並且不能被改變. 一個臨時性znode會在建立它的客戶端的會話結束時由ZooKeeper刪除, 一個永久的znode並不依賴客戶端會話, 而且只有在客戶端明確刪除它的時候才會被刪除(不一定是建立的客戶端), 一個臨時的znode不應該有子節點, 即臨時性的子節點.

臨時性znode繫結在客戶端會話上, 它們對所有客戶端都是可見的.

臨時性znode對於建立那些需要知道什麼時候某些分散式資源可以使用的應用非常有效.

在znode有改變時, Watch使客戶端了解相應的資訊. Watch由ZooKeeper服務的操作來設定, 同時由服務的其他操作來觸發. 比如, 一個客戶端可能呼叫znode上的exists操作, 同時在這個節點上加了一個Watch, 如果這個節點不存在, 返回false, 如果一段時間之後, 這個znode由另一臺客戶端建立了, 那麼Watch將被觸發, 通知第一臺客戶端znode被建立的訊息.

Watcher只被觸發一次, 為了獲得多次提醒, 客戶端需要再次註冊Watcher.

更新ZooKeeper的操作是有限制的. delete或setData必須明確需要更新的znode的版本號, 如果版本號不匹配, 更新就會失敗. 更新操作是非阻塞的, 因此客戶端如果失去了一個更新, 它可以在不阻塞其他進行程序執行的情況下, 選擇重新嘗試或進行其他操作.

儘管ZooKeeper可以被看著是一個檔案系統, 但是它處於便利性的考慮, 摒棄了一些檔案系統的操作原語. 因為檔案非常小並且是整體讀寫的, 所以不需要提供開啟, 關閉或定址操作.

ZooKeeper的非同步化API使你能並行處理請求, 在某些場景下可以提供更好的吞吐量. 如果你想讀取大批量znode並且獨立的處理他們, 使用同步api的話, 那麼每個讀操作都會被阻塞, 直到它返回的那一刻, 相反使用非同步化, 可以非常快的執行所有非同步操作, 並且用不同的執行緒來處理響應.

讀操作exists, getChildren和getData都被設定了watch, 並且這些watch都由寫操作來觸發:create, delete和setData. ACL操作並不參與到watch中.

exists操作上的watch在被監視的znode建立, 刪除或資料更新時被觸發
getData操作上的watch在被監視的znode刪除或資料更新時觸發, 在建立時不能觸發, 因為只有znode一定存在, getData操作才會成功.
getChildren操作上的watch在被監視的znode的子節點建立或刪除時, 或者當這個znode節點的子節點被刪除時被觸發. 可以通過檢視watch事件型別來區分是znode, 還是它的子節點被刪除.

ACL


如果我們想要客戶端在example.com域中對znode進行讀訪問, 那麼可以對這個znode的ACL進行設定, 使用host模式, 帶有example.com的ID和READ許可, 在java中可以這樣建立ACL物件:
new ACL(Perms.READ, new Id("host", "example.com"));

exists並不受ACL許可控制.

ZooKeeper以複合模式執行在一組叫做ensemble的叢集上, ZooKeeper通過複製來獲得高可用性, 同時, 只要ensemble中的大部分機器都執行正常就可以提供服務. 比如說, 在一個5節點ensemble中, 可以在任何兩臺機器故障的情況下服務仍在運作, 如果6節點的話, 也只能承受兩臺出現故障, 因此一個ensemble通常選擇奇數臺機器.

ZooKeeper的思想非常簡單: 它所需要做的就是保證對znode樹的每一次修改都複製到ensemble中的大部分機器上去.

ZooKeeper採用了Zab協議, 它分為兩個階段, 並且可能被無限制的重複.
階段1:領導者選舉
在ensemble中的機器要參與一個選擇特殊成員的程序, 這個成員叫做領導者, 其他機器則叫做跟隨者, 在大部分的跟隨者與它們的領導者同步了狀態以後, 這個階段才算完成.
階段2:原子廣播
所有的寫操作請求被傳送給領導者, 並通過廣播將更新資訊告訴給跟隨者, 當大部分跟隨者執行了修改後, 領導者就會提交更新操作, 客戶端將得到更新成功的迴應.

如果領導者出現故障, 剩下的機器將會再次進行領導者選舉, 並在新領導者被選出來之前繼續執行任務. 如果在不久之後, 老的領導者恢復了. 那麼它將以跟隨者的身份繼續執行, 領導者的選舉非常快(200ms左右), 因此不會帶來明顯的延遲.

所有在ensemble中的機器在更新它們的記憶體中的znode樹之前會先將更新資訊寫入磁碟. 讀操作請求可能由任何機器服務, 同時由於它們只涉及到記憶體查詢, 因此非常快.

一致性
在ensemble中的領導者和跟隨者非常聰明, 跟隨者通過來更新號來滯後領導者, 結果導致只要大部分而不是所有ensemble確認更新

每一個對znode樹的更新都會給定一個全域性標識, 叫zxid(ZooKeeper事務id).

ZooKeeper客戶端與ensemble中的伺服器列表配置相一致, 在啟動時, 它嘗試與表中的一個伺服器相連線, 如果連線失敗, 它就嘗試列表中的其他伺服器, 以此類推, 直到最終連線到其中一個, 或者當ZooKeeper的所有伺服器都無法連線時, 連線失敗.

一旦與ZooKeeper伺服器連線成功, 伺服器會建立與客戶端的一個新的會話. 每個會話都會有超時時間, 這個是在會話建立時設定的, 如果伺服器在超時時段內得到請求, 它可能中斷該會話, 一旦會話中斷, 它可能不再開啟, 與該會話相關的臨時性節點都將丟失.

在會話空閒的一定時間內, 都會由客戶端發起ping請求來保持活躍(猶如心跳)(ping是由zk客戶端自動傳送, 不需要由程式來指定), 超時時段要設定的足夠小, 以便能檢測到伺服器故障, 並且能在會話超時時連線到另外一臺伺服器.

建立複雜的臨時性節點狀態的應用, 應該設定更長的會話超時時間, 因為重建這些內容的代價非常昂貴, 在這種情況下, 應用程式可以有更多的時間來重啟, 從而避免會話過期. 每個會話都由伺服器給定一個唯一的身份和密碼, 而且如果在建立連線時傳遞給zk的話, 它就能恢復會話(只要沒有過期), 所以應用程式可以安全關閉, 同時因為儲存了身份和密碼, 它可以重新獲得這個身份和密碼並恢復會話.

一個zk例項一次只能處於一種狀態, 一個zk例項在建立與zk伺服器的建立時, 處於CONNECTING狀態, 一旦連線建立, 它就變成CONNECTED狀態了.

使用zk的客戶端可以通過註冊watcher的方法來獲取狀態的改變的訊息. 一旦進入CONNECTED狀態, watcher將獲得一個KeeperState值為SyncConnected的WatchEvent.

zk的watcher物件有兩個職責, 一個是瞭解zk例項的狀態變化, 一個瞭解znode的改變, 初始化或者在在zk例項讀取znode節點資訊的讀方法(exists方法和getData方法)上指定是否需要watch znode節點變化, 這裡的watcher可以是專門的, 也可以是zk例項建構函式中指定預設的.

zk可以作為配置的高可用儲存, 使應用的參與者可以恢復或更新配置檔案, 使用zk的監測可以建立靈活配置服務, 有需要的客戶端可以以此來了解配置的改變.

    public class ConnectionWatcher implements Watcher{  
        private static final int SESSION_TIMEOUT = 5000;  
        private ZooKeeper zk;  
        private CountDownLatch connectedSignal = new CountDownLatch(1);  
      
          
        public void connect(String hosts) throws IOException, InterruptedException {  
            zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);  
            connectedSignal.await();  
        }  
      
        @Override  
        public void process(WatchedEvent event) {  
            if (event.getState() == KeeperState.SyncConnected) {  
                connectedSignal.countDown();  
            }  
        }  
      
        public void create(String groupName) throws KeeperException, InterruptedException {  
            String path = "/" + groupName;  
            String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
            System.out.println("created" +  createPath);  
        }  
         
        public void close() throws InterruptedException {  
            zk.close();  
        }  
    }  
      
        public class ActiveKeyValueStore extends ConnectionWatcher {  
            private  final Charset CHARSET = Charset.forName("UTF-8");  
            private final int MAX_RETRIES = 3;  
            private final int RETRY_PERIOD_SECONDS = 5;  
             
            public void write(String path, String value) throws KeeperException, InterruptedException {  
                Stat stat = zk.exists(path, false);  
                int retries = 0;  
                while(true) {  
                    try {  
                        if (stat == null) {  
                            zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
                        }else {  
                            zk.setData(path, value.getBytes(CHARSET), -1);  
                        }    
                    }catch(KeeperException.SessionExpiredException e) {  
                        throw e;  
                    }catch(KeeperException e) {  
                        if (retries ++ == MAX_RETRIES) {  
                            throw e;  
                        }  
                        TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);  
                    }  
                }  
            }  
             
            public String read(String path, Watcher watcher) throws KeeperException, InterruptedException {  
                byte[] data = zk.getData(path, watcher, null);  
                return new String(data, CHARSET);  
            }  
        }  
      
    public class ConfigWatcher implements Watcher {  
      
        private ActiveKeyValueStore store;  
        private String path;  
         
        public ConfigWatcher(String hosts) throws IOException, InterruptedException {  
            store = new ActiveKeyValueStore();  
            store.connect(hosts);  
        }  
         
        @Override  
        public void process(WatchedEvent event) {  
            if (event.getType() == EventType.NodeDataChanged) {  
                try {  
                    displayConfig();  
                } catch (KeeperException e) {  
                    e.printStackTrace();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                    Thread.currentThread().interrupt();  
                }  
            }  
        }  
      
        public void displayConfig() throws KeeperException, InterruptedException {  
            System.out.println(store.read(path, this));  
        }  
      
        public static void main(String[] args) throws Exception {  
            ConfigWatcher watcher = new ConfigWatcher(args[0]);  
            watcher.displayConfig();  
             
            Thread.sleep(Long.MAX_VALUE);  
        }  
    } 


zk例項方法大多都帶有一個InterruptException異常, 可以通過呼叫被阻塞執行緒的interrupt()方法來丟擲一個InterruptException異常來取消一個zk操作.

如果zk伺服器傳送錯誤資訊或者伺服器發生通訊故障時, KeeperException將丟擲, KeeperException用不同的子類來對應不同的出錯, 比如KeeperException.NoNoeException在不存在的znode上執行操作將會被丟擲

KeeperException包括三種明確的種類:
狀態異常
通常出現在另一個程序在改變znode, 而當前程序沒有感知到該改變. 例如呼叫setData方法對znode進行更新時, 另一個程序也在更新, 此時將丟擲BadVersionException, 對於這種可能發生的情況, 必須通過編碼重試來避免. 還有一些可能是程式錯誤, 比如在建立一個臨時節點時可能發生NoChildrenEphemeralsException.

可恢復異常
比如在一個會話中, 可能連線丟失, 將觸發ConnectionLossException, 此時可以通過重連來恢復會話, 保證會話的完整性.

冪等操作是指相同的結果可以被一次又一次應用的操作, 比如讀請求或者無條件的setData, 它可以簡單的被重新嘗試.

不可恢復異常
比如建立連線時出現驗證失敗, 會丟擲AuthFailedException. 另外一種就是會話過期, 會丟擲SessionExpireException, 此時狀態為CLOSED, 永遠無法重連. 對於這種情況, 可以通過在watcher中判斷KeeperState的狀態是否為Expired, 如果是則嘗試重新建立連線, 從而保證write方法的重試.

如果zk例項連線到zk伺服器失敗, 會嘗試ensemble中的另一臺, 如果所有伺服器都無法連線, 將丟擲IOException.

zookeeper應該只執行在只負責zookeeper的機器上, 有其他應用程式競爭資源會顯著影響zookeeper的效能.

每一個在ensemble叢集中的zookeeper都有一個在叢集中唯一的數字, 這個數字必須在1~255之間, 這個號碼在dataDir路徑下的純文字檔案myId中.

在zookeeper配置檔案中有這樣一行:

server.n=hostname:port:port

n表示伺服器號, 有兩個埠, 第一個是跟隨者連線到領導者的埠, 第二個是用來選舉領導者的.
比如:

server.1=zookeeper1:2888:3888

zookeeper監聽三個埠: 2181用來監聽zookeeper客戶端連線, 2888, 如果是領導者, 用來監聽跟隨者連線, 3888用來在選舉領導者階段, 用來監聽其他伺服器的連線.

當zookeeper伺服器啟動時, 它讀取myid檔案來判斷這是哪個伺服器, 然後讀取配置檔案來確定它需要監聽哪個埠, 以及ensemble中其他伺服器的網路地址

連線到zookeeper伺服器的客戶端例項, 應該使用"zookeeper1:2181, zookeeper2:2181,zookeeper3:2181"來作為zookeeper例項的構造引數.

在複製模式中, 還有另外兩個屬性: initLimit, syncLimit
initLimit表示跟隨者連線到領導者可以與其同步的時間, 如果在這個時間內跟隨者無法與領導者進行同步, 那麼領導者將放棄領導地位, 重新選舉, 通過檢視日誌, 如果發現這種事情經常發生, 說明這個時間設定短了, 需要加長一些.

syncLimit是允許一個跟隨者與領導者同步的時間, 如果跟隨者在這段時間內不能與領導者同步, 將重啟, 而與該跟隨者的連線將被連線到另外一臺機器上.