rpc系列-ZooKeeper
阿新 • • 發佈:2018-12-12
一.簡介
Zookeeper是一個分散式協調服務,就是為使用者的分散式應用程式提供協調服務。
Zookeeper本身就是一個分散式程式(只要有半數以上節點存活,zk就能正常服務)。
Zookeeper所提供的服務涵蓋:主從協調、伺服器節點動態上下線、統一配置管理、分散式共享鎖、統一名稱服務……
雖然說可以提供各種服務,但是zookeeper在底層其實只提供了兩個功能:
管理(儲存,讀取)使用者程式提交的資料;
併為使用者程式提供資料節點監聽服務;
Zookeeper叢集的角色: Leader 和 follower (Observer)
只要叢集中有半數以上節點存活,叢集就能提供服務
二.結構
特性
1.Zookeeper:一個leader,多個follower組成的叢集
2.全域性資料一致:每個server儲存一份相同的資料副本,client無論連線到哪個server,資料都是一致的
3.分散式讀寫,更新請求轉發,由leader實施
4.更新請求順序進行,來自同一個client的更新請求按其傳送順序依次執行
5.資料更新原子性,一次資料更新要麼成功,要麼失敗
6.實時性,在一定時間範圍內,client能讀到最新資料
結構
1.層次化的目錄結構,命名符合常規檔案系統規範(見下圖)
2.每個節點在zookeeper中叫做znode,並且其有一個唯一的路徑標識
3.節點Znode可以包含資料和子節點(但是EPHEMERAL型別的節點不能有子節點)
4.客戶端應用可以在節點上設定監視器
節點型別
1.Znode有兩種型別:
短暫(ephemeral)(斷開連線自己刪除)
持久(persistent)(斷開連線不刪除)
2.Znode有四種形式的目錄節點(預設是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
EPHEMERAL
EPHEMERAL_SEQUENTIAL
3.建立znode時設定順序標識,znode名稱後會附加一個值,順序號是一個單調遞增的計數器,由父節點維護。
4.在分散式系統中,順序號可以被用於為所有的事件進行全域性排序,這樣客戶端可以通過順序號推斷事件的順序。
三.原理
zookeeper的選舉機制(叢集paxos)
以一個簡單的例子來說明整個選舉的過程. 假設有五臺伺服器組成的zookeeper叢集,它們的id從1-5,同時它們都是最新啟動的,也就是沒有歷史資料,在存放資料量這一點上,都是一樣的.假設這些伺服器依序啟動,來看看會發生什麼. 1. 伺服器1啟動,此時只有它一臺伺服器啟動了,它發出去的報沒有任何響應,所以它的選舉狀態一直是LOOKING狀態。
2.伺服器2啟動,它與最開始啟動的伺服器1進行通訊,互相交換自己的選舉結果,由於兩者都沒有歷史資料,所以id值較大的伺服器2勝出,但是由於沒有達到超過半數以上的伺服器都同意選舉它(這個例子中的半數以上是3),所以伺服器1,2還是繼續保持LOOKING狀態。
3.伺服器3啟動,根據前面的理論分析,伺服器3成為伺服器1,2,3中的老大,而與上面不同的是,此時有三臺伺服器選舉了它,所以它成為了這次選舉的leader。
4. 伺服器4啟動,根據前面的分析,理論上伺服器4應該是伺服器1,2,3,4中最大的,但是由於前面已經有半數以上的伺服器選舉了伺服器3,所以它只能接收當小弟的命了。 5.伺服器5啟動,同4一樣,當小弟.
非全新叢集的選舉機制(資料恢復)
那麼,初始化的時候,是按照上述的說明進行選舉的,但是當zookeeper運行了一段時間之後,有機器down掉,重新選舉時,選舉過程就相對複雜了。
需要加入資料id、leader id和邏輯時鐘。
資料id:資料新的id就大,資料每次更新都會更新id。
Leader id:就是我們配置的myid中的值,每個機器一個。
邏輯時鐘:這個值從0開始遞增,每次選舉對應一個值,也就是說: 如果在同一次選舉中,那麼這個值應該是一致的 ; 邏輯時鐘值越大,說明這一次選舉leader的程序更新。
選舉的標準就變成:
1.邏輯時鐘小的選舉結果被忽略,重新投票
2.統一邏輯時鐘後,資料id大的勝出
3.資料id相同的情況下,leader id大的勝出
根據這個規則選出leader。
四.示例
Zookeeper的監聽器工作機制
監聽器是一個介面,我們的程式碼中可以實現Wather這個介面,實現其中的process方法,方法中即我們自己的業務邏輯
監聽器的註冊是在獲取資料的操作中實現:
getData(path,watch)監聽的事件是:節點資料變化事件
getChildren(path,watch)監聽的事件是:節點下的子節點增減變化事件
服務端
public class DistributedServer {
private static final String host = "localhost:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers/";
private ZooKeeper zk = null;
/**
* 建立到zk的客戶端連線
*
* @throws Exception
*/
public void getConnect() throws Exception {
zk = new ZooKeeper(host, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知後的回撥函式
System.out.println(event.getType() + "__" + event.getPath());
try {
zk.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 向zk叢集註冊伺服器資訊
* ZooDefs.Ids.OPEN_ACL_UNSAFE 預設匿名許可權,許可權scheme id:'world,'anyone,許可權位:31(adcwr)
* ZooDefs.Ids.READ_ACL_UNSAFE 只讀許可權,許可權scheme id:'world,'anyone,許可權位:1(r)
*
* CreateMode
* 節點型別,型別定義在列舉CreateMode中:
* (1)PERSISTENT:持久;
* (2)PERSISTENT_SEQUENTIAL:持久順序;
* (3)EPHEMERAL:臨時;
* (4)EPHEMERAL_SEQUENTIAL:臨時順序。
* @param data 建立節點初始化內容
* @throws Exception
*/
public void registerServer(String data) throws Exception {
String create = zk.create(parentNode + "test", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(data + " 註冊節點 " + create);
}
/**
* 業務功能
*
* @throws InterruptedException
*/
public void handleBussiness(String data) throws InterruptedException {
System.out.println(data + "開始handleBussiness");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 獲取zk連線
DistributedServer server = new DistributedServer();
server.getConnect();
// 利用zk連線註冊伺服器資訊
server.registerServer("test01");
// 啟動業務功能
server.handleBussiness("test01");
}
}
分散式鎖
/**
*
*分散式鎖:幾種實現方式,示例用的是用臨時順序節點實現共享鎖的一般做法
*
* 邏輯
* 1.zk上註冊一個"臨時+序號"的znode,並監聽父節點
* 2.獲取父節點下所有程式子節點,比較序號大小
* 3.序號最小的獲取到"鎖",去訪問資源,訪問完後,刪除自己的節點,釋放鎖,重新註冊一個新的子節點
* 4.其他程式節點會收到事件通知,可以去zk上獲取鎖
*/
public class DistributedClientLock {
// 會話超時
private static final int SESSION_TIMEOUT = 2000;
// zookeeper叢集地址
private String hosts = "localhost:2181";
private String groupNode = "servers";
private String subNode = "test";
private boolean haveLock = false;
private ZooKeeper zk;
/**
* 記錄自己建立的子節點路徑
* volatile 不是執行緒安全的,具有可見性,在一個子記憶體操作完後,立即重新整理回到主記憶體。
* 如果不加Volatile,每次呼叫thisPath,會有副本,修改會有延遲,比如其它執行緒搶到沒有修改完的資料,就在新的執行緒繼續執行,造成最後資料值有誤
* 比如:一個執行緒寫,其它執行緒去讀的時候,用的Volatile,比如監聽新節點插入。
*/
private volatile String thisPath;
/**
* 連線zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
System.out.println(event.getType()+"____"+event.getPath());
/**
* 判斷事件型別,此處只處理子節點變化事件
* event For “/path” event For “/path/child”
* create(“/path”) EventType.NodeCreated 無
* delete(“/path”) EventType.NodeDeleted 無
* setData(“/path”) EventType.NodeDataChanged 無
* create(“/path/child”) EventType.NodeChildrenChanged(getChild) EventType.NodeCreated
* delete(“/path/child”) EventType.NodeChildrenChanged(getChild) EventType.NodeDeleted
* setData(“/path/child”) 無 EventType.NodeDataChanged
*/
if (event.getType() == Event.EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//獲取子節點,並對父節點進行監聽
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比較是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//訪問共享資源處理業務,並且在處理完成之後刪除鎖
doSomething();
//重新註冊一把新的鎖
thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 程式一進來就先註冊一把鎖到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會,便於觀察
Thread.sleep(new Random().nextInt(1000));
// 從zk的鎖父目錄下,獲取所有子節點,並且註冊對父節點的監聽
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果爭搶資源的程式就只有自己,則可以直接去訪問共享資源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 處理業務邏輯,並且在最後釋放鎖
*/
private void doSomething() throws Exception {
try {
System.out.println("鎖: " + thisPath);
Thread.sleep(2000);
} finally {
System.out.println("完成: " + thisPath);
//刪除當前節點
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientLock dl = new DistributedClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}