1. 程式人生 > >Zookeeper客戶端學習

Zookeeper客戶端學習

package com.test.com.test.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @author Jane       E-mail:
[email protected]
* @version Zookeeper客戶端示例及原始碼分析 建立時間:2018/10/3 */ /** * * 本內容完全是關於zk的客戶端,其中例子的執行需要在本地啟動一個zkServer叢集。 * * zk 建議全部通過非同步機制進行回撥處理來提高效能,從而無需捕獲異常(只需根據狀態碼進行相應處理)。 * zk通過單執行緒(eventThread)處理所有的回撥通知(非同步回撥,監聽回撥),所以在回撥處理過多的情況下,可能造成較高的延遲。 * * Zookeeper類提供了一系列與zkServer互動的方法,並關聯了ClientCnxn客戶端,此客戶端開啟一個與zkServer的會話,並關聯sendThread和eventThread用於對傳送請求和回撥事件進行處理, * ClientCnxnSocketNIO通過NIO的方式對socket進行處理。 * * eventThread負責從其維護的阻塞佇列中獲取event,如果是DeathEvent則關閉此執行緒,否則根據事件型別進行解析,呼叫回撥結構的不同過載(事件監聽回撥,非同步通知回撥)。各種回撥都被封裝成實現Event介面的物件 * sendThread負責與zkServer的連線,並負責心跳和重連。然後呼叫doTransport()方法。非同步的方式,客戶端將回調處理器包裝成一個Packet物件,並將該物件加入outgoing佇列中。 * doTransport()方法通過NIO select獲取一次觸發的所有事件。 * 如果channel可讀,則從讀buffer中讀取內容,然後呼叫sendThread的readResponse方法,然後根據獲取訊息的響應頭執行不同處理 * Xid為-2:心跳響應 * Xid為-4:認證授權 * Xid為-1:監聽事件通知,讀取事件的response,並將其提交到eventThread的佇列中(將回調相應Watcher介面的實現處理器)。 * 為sasl:獲取token然後傳送給服務端 * 為回撥通知:客戶端認為除以上所有情況外,都屬於回撥的通知。從pending佇列中取出packet進行以下處理 * 1. 如果取出的Xid與服務端響應的Xid不一致,則認為傳送連線丟失。 * 2. 將服務端返回的訊息,根據協議填回packet欄位中 * 3. 如果沒有回撥介面則notifyAll,如果有回撥則將packet加入eventThread的阻塞佇列中。 * * 如果channel可寫,則從outgoing佇列中取出一個Packet物件,將buffer內容寫入socket中,然後將此packet物件交與pending佇列中。 * * <p> * zk客戶端提供了重連機制,如果心跳檢測網路斷開,會一直嘗試重新連線(連線原伺服器,或者叢集中的其他伺服器),直到客戶端程序被殺死。 * <p> * zk客戶端重新連線,但是服務端已經將此會話置為無效,則必須重新new Zookeeper控制代碼,與服務端開啟一個新的會話。 * 優雅的關閉與服務端的會話,需要呼叫 Zookeeper.close(); * <p> * 對於外部資源的訪問,需要使用隔離符(czxid)使用者接受最新的會話的請求,如果Czxid落後則不允許訪問。 */ public class Master implements Watcher { // Watcher介面實現一個監聽,在客戶端啟動後回撥 private ZooKeeper zk; private String hostPort; Master(String hostPort) { this.hostPort = hostPort; } void startZK() throws IOException { zk = new ZooKeeper(hostPort, 15000, this); } @Override public void process(WatchedEvent watchedEvent) { System.out.println("收到監聽通知:" + watchedEvent); } /** * 同步呼叫建立znode * 必須要自己解決受檢異常 * @param path * @param value */ public void createZnode(String path, String value) { try { String str = this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(str); } catch (KeeperException e) { System.out.println("check一下,看那個程序建立了子節點"); System.out.println(String.valueOf(getData(path))); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 非同步的方式建立Znode. * CreateMode.EPHEMERAL只會建立臨時節點 客戶端會話關閉自動刪除 * CreateMode.PERSISTENT會建立持久化節點 * 在回撥處理中,根據異常返回碼進行相應處理 * @param path * @param value */ public void createZnodeAsync(String path, String value) { this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { @Override public void processResult(int i, String s, Object o, String s1) { System.out.println(i + " " + s + " " + o + " " + s1); } }, new Object()); } /** * 獲取某個節點的資料,即建立時設定的值 * * @param path * * @return */ public String getData(String path) { byte[] result = null; try { result = zk.getData(path, false, new Stat()); } catch (KeeperException e) { System.out.println("繼續check或者報警"); } catch (InterruptedException e) { e.printStackTrace(); } return String.valueOf(result); } /** * 查詢某個節點是否存在,如果存在則設定刪除節點監聽。 * 帶Watcher監聽器 和 非同步回撥處理器 * @param path */ public void exist(String path) { Stat s = new Stat(); zk.exists(path, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("設定監視節點刪除事件回撥"); } }, new AsyncCallback.StatCallback() { @Override public void processResult(int i, String s, Object o, Stat stat) { System.out.println("exist回撥處理器"); // 根據不同的返回異常碼,做相應的處理,對於連線丟失事件需要重試。 switch (KeeperException.Code.get(i)) { case OK: System.out.println("處理成功"); break; case NOTEMPTY: System.out.println("已經存在"); break; case CONNECTIONLOSS: System.out.println("連線丟失,可能傳送時丟失,可能接收時丟失,建議重試"); break; } } }, s); } /** * 獲取某個路徑下的子節點,並設定監聽,如果子節點變化,則回撥處理器 * * @param path */ public void getChild(String path) { zk.getChildren(path, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("getChild設定監聽,如果子節點變動,則回撥此處理器"); } }, new AsyncCallback.ChildrenCallback() { @Override public void processResult(int i, String s, Object o, List<String> list) { System.out.println("getChild返回結果回撥"); switch (KeeperException.Code.get(i)) { case OK: System.out.println("處理成功"); for (String str : list) { System.out.println(str); } } } }, new Object()); } public static void main(String args[]) throws Exception { Master m = new Master("127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184"); // 建立控制代碼,並與zookeeper Server建立會話連線 // 這個建立會話也是非同步執行的 m.startZK(); //m.createZnode("/master", "hello world"); //m.createZnodeAsync("/slave","hello world"); // 如果存在則在回撥中處理,並設定監聽,當伺服器刪除節點時候收到回撥通知。 //m.exist("/slave"); System.out.println(m.getData("/slave")); // 獲取某個路徑下所有子節點,並設定監聽,如果子節點更改,則回撥監聽事件。 m.getChild("/slave"); Thread.sleep(60000); m.closeZK(); } public void closeZK() { try { this.zk.close(); } catch (InterruptedException e) { System.out.println("關閉zk失敗"); } } }