[zookeeper]zookeeper系列三:zookeeper中watcher的使用及原理
watcher解決的問題
在進入watcher之前我們先試想在應用伺服器叢集中可能存在的兩個問題:
- 因為叢集中有很多機器,當某個通用的配置發生變化後,怎麼讓自動的讓所有伺服器的配置統一生效?
- 當叢集中某個節點宕機,如何讓叢集中的其他節點知道?
為了解決這兩個問題,zookeeper引入了watcher機制來實現釋出/訂閱功能,能夠讓多個訂閱者同時監聽某一個主題物件,當這個主題物件自身狀態發生變化時,會通知所有訂閱者。
watcher基本原理
zookeeper中實現watcher需要有三個部分,如下圖所示:
分別是zookeeper服務端、客戶端以及客戶端的watchManager。
如圖所示,客戶端向zk註冊watcher的同時,會將客戶端的watcher物件儲存在客戶端的WatchManager中;zk伺服器觸發watch事件後,會向客戶端傳送通知,客戶端執行緒從watchManager中取出對應watcher執行。
客戶端如何實現事件通知的動作
客戶端只需定義一個類實現org.apache.zookeeper.Watcher
介面並實現介面中的如下方法:
abstract public void process(WatchedEvent event);
即可在得到通知後執行相應的動作。引數org.apache.zookeeper.WatchedEvent
是zk服務端傳過來的事件,有三個成員:
final private KeeperState keeperState; // 通知狀態
final private EventType eventType; // 事件型別
private String path; // 哪個節點發生的時間
分別代表通知的狀態、事件型別和發生事件的節點。
keeperState是個列舉物件,代表客戶端和zk伺服器的連結狀態,定義如下:
/**
* Enumeration of states the ZooKeeper may be at the event
*/
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation).
* /
SyncConnected (3),
/**
* Auth failed state
*/
AuthFailed (4),
/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
ConnectedReadOnly (5),
/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
SaslAuthenticated(6),
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble.
*/
Expired (-112);
private final int intValue; // Integer representation of value
// for sending over wire
KeeperState(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static KeeperState fromInt(int intValue) {
switch(intValue) {
case -1: return KeeperState.Unknown;
case 0: return KeeperState.Disconnected;
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
case 5: return KeeperState.ConnectedReadOnly;
case 6: return KeeperState.SaslAuthenticated;
case -112: return KeeperState.Expired;
default:
throw new RuntimeException("Invalid integer value for conversion to KeeperState");
}
}
}
eventType也是個列舉型別,代表節點發生的事件型別,比如建立新的子節點、改變節點資料等,定義如下:
/**
* Enumeration of types of events that may occur on the ZooKeeper
*/
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4),
DataWatchRemoved (5),
ChildWatchRemoved (6);
private final int intValue; // Integer representation of value
// for sending over wire
EventType(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static EventType fromInt(int intValue) {
switch(intValue) {
case -1: return EventType.None;
case 1: return EventType.NodeCreated;
case 2: return EventType.NodeDeleted;
case 3: return EventType.NodeDataChanged;
case 4: return EventType.NodeChildrenChanged;
case 5: return EventType.DataWatchRemoved;
case 6: return EventType.ChildWatchRemoved;
default:
throw new RuntimeException("Invalid integer value for conversion to EventType");
}
}
}
keeperState和eventType對應關係如下所示:
對於NodeDataChanged
事件:無論節點資料發生變化還是資料版本發生變化都會觸發(即使被更新資料與新資料一樣,資料版本都會發生變化)。
對於NodeChildrenChanged
事件:新增和刪除子節點會觸發該事件型別。
需要注意的是:WatchedEvent
只是事件相關的通知,並沒有對應資料節點的原始資料內容及變更後的新資料內容,因此如果需要知道變更前的資料或變更後的新資料,需要業務儲存變更前的資料和呼叫介面獲取新的資料
如何註冊watcher
watcher註冊api
可以在建立zk客戶端例項的時候註冊watcher(構造方法中註冊watcher):
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,ZKClientConfig conf)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
ZooKeeper
的構造方法中傳入的watcher將會作為整個zk會話期間的預設watcher,該watcher會一直儲存為客戶端ZKWatchManager
的defaultWatcher
成員,如果有其他的設定,這個watcher會被覆蓋。
除了可以通過ZooKeeper
類的構造方法註冊watcher外,還可以通過ZooKeeper
類中其他一些api來註冊watcher,只不過這些api註冊的watcher就不是預設watcher
了(以下每個註冊watcher的方法有很多個過載的方法,就不一一列舉出來)。
public List<String> getChildren(final String path, Watcher watcher)
// boolean watch表示是否使用上下文中預設的watcher,即建立zk例項時設定的watcher
public List<String> getChildren(String path, boolean watch)
// boolean watch表示是否使用上下文中預設的watcher,即建立zk例項時設定的watcher
public byte[] getData(String path, boolean watch, Stat stat)
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// boolean watch表示是否使用上下文中預設的watcher,即建立zk例項時設定的watcher
public Stat exists(String path, boolean watch)
public Stat exists(final String path, Watcher watcher)
watcher註冊示例程式碼
本示例中使用zookeeper自帶客戶端演示watcher的使用,zookeeper自帶客戶端有一點需要注意:
Watcher設定後,一旦觸發一次即會失效,如果需要一直監聽,則需要再註冊
定義預設watcher:
/**
* 測試預設watcher
*/
public class DefaultWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("==========DefaultWatcher start==============");
System.out.println("DefaultWatcher state: " + event.getState().name());
System.out.println("DefaultWatcher type: " + event.getType().name());
System.out.println("DefaultWatcher path: " + event.getPath());
System.out.println("==========DefaultWatcher end==============");
}
}
定義監聽子節點變化的watcher:
/**
* 用於監聽子節點變化的watcher
*/
public class ChildrenWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("==========ChildrenWatcher start==============");
System.out.println("ChildrenWatcher state: " + event.getState().name());
System.out.println("ChildrenWatcher type: " + event.getType().name());
System.out.println("ChildrenWatcher path: " + event.getPath());
System.out.println("==========ChildrenWatcher end==============");
}
}
定義監聽節點變化的watcher:
public class DataWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("==========DataWatcher start==============");
System.out.println("DataWatcher state: " + event.getState().name());
System.out.println("DataWatcher type: " + event.getType().name());
System.out.println("DataWatcher path: " + event.getPath());
System.out.println("==========DataWatcher end==============");
}
}
watcher測試程式碼:
public class WatcherTest {
/**
* 連結zk服務端的地址
*/
private static final String CONNECT_STRING = "192.168.0.113:2181";
public static void main(String[] args) {
// 除了預設watcher外其他watcher一旦觸發就會失效,需要充新註冊,本示例中因為
// 還未想到比較好的重新註冊watcher方式(考慮到如果在Watcher中持有一個zk客戶端的
// 例項可能存在迴圈引用的問題),因此暫不實現watcher失效後重新註冊watcher的問題,
// 後續可以查閱curator重新註冊watcher的實現方法。
// 預設watcher
DefaultWatcher defaultWatcher = new DefaultWatcher();
// 監聽子節點變化的watcher
ChildrenWatcher childrenWatcher = new ChildrenWatcher();
// 監聽節點資料變化的watcher
DataWatcher dataWatcher = new DataWatcher();
try {
// 建立zk客戶端,並註冊預設watcher
ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, 100000, defaultWatcher);
// 讓預設watcher監聽 /GetChildren 節點的子節點變化
// zooKeeper.getChildren("/GetChildren", true);
// 讓childrenWatcher監聽 /GetChildren 節點的子節點變化(預設watcher不再監聽該節點子節點變化)
zooKeeper.getChildren("/GetChildren", childrenWatcher);
// 讓dataWatcher監聽 /GetChildren 節點本省的變化(預設watcher不再監聽該節點變化)
zooKeeper.getData("/GetChildren", dataWatcher, null);
TimeUnit.SECONDS.sleep(1000000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
測試過程:
首先在命令列客戶端建立節點 /GetChildren
[zk: localhost:2181(CONNECTED) 133] create /GetChildren GetChildrenData
Created /GetChildren
執行測試程式碼WatcherTest
,輸出如下內容:
==========DefaultWatcher start==============
DefaultWatcher state: SyncConnected
DefaultWatcher type: None
DefaultWatcher path: null
==========DefaultWatcher end==============
可以看出在客戶端第一次連結zk服務端時觸發了連結成功的事件通知,該事件由預設watcher接收,導致預設watcher相關程式碼得到執行。
接著在命令列客戶端建立子節點:
[zk: localhost:2181(CONNECTED) 134] create /GetChildren/ChildNode ChildNodeData
Created /GetChildren/ChildNode
ChildrenWatcher收到通知,/GetChildren的子節點發生變化,因此輸出如下內容:
==========ChildrenWatcher start==============
ChildrenWatcher state: SyncConnected
ChildrenWatcher type: NodeChildrenChanged
ChildrenWatcher path: /GetChildren
==========ChildrenWatcher end==============
最後在命令列客戶端修改 /GetChildren 節點資料:
[zk: localhost:2181(CONNECTED) 135] set /GetChildren GetChildrenDataV2
cZxid = 0xab
ctime = Sat Sep 15 03:52:48 PDT 2018
mZxid = 0xb0
mtime = Sat Sep 15 04:06:05 PDT 2018
pZxid = 0xaf
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 1
DataWatcher收到通知,輸出如下內容:
==========DataWatcher start==============
DataWatcher state: SyncConnected
DataWatcher type: NodeDataChanged
DataWatcher path: /GetChildren
==========DataWatcher end==============
我們可以接著在命令列客戶端修改 /GetChildren 節點資料:
[zk: localhost:2181(CONNECTED) 136] set /GetChildren GetChildrenDataV3
cZxid = 0xab
ctime = Sat Sep 15 03:52:48 PDT 2018
mZxid = 0xb1
mtime = Sat Sep 15 04:14:54 PDT 2018
pZxid = 0xaf
cversion = 1
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 1
但WatcherTest沒有任何輸出了,說明DataWatcher已經失效了,要能夠繼續出發需要重新註冊。
watcher實現原始碼分析
我們以註冊watcher的 getData api為例,分析watcher的註冊流程,以setData api為例,分析watcher的觸發流程。
getData
的實現在org.apache.zookeeper.ZooKeeper
類中,具體程式碼如下:
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
重點看:
......
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
......
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
......
我們首先看org.apache.zookeeper.ZooKeeper.DataWatchRegistration
和org.apache.zookeeper.ZooKeeper.WatchRegistration
類的實現程式碼:
/**
* Register a watcher for a particular path.
*/
public abstract class WatchRegistration {
private Watcher watcher;
private String clientPath;
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
* add the watch on the path.
*/
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
/**
* Determine whether the watch should be added based on return code.
* @param rc the result code of the operation that attempted to add the
* watch on the node
* @return true if the watch should be added, otw false
*/
protected boolean shouldAddWatch(int rc) {
return rc == 0;
}
}
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
org.apache.zookeeper.ZooKeeper.DataWatchRegistration#getWatches
方法是從org.apache.zookeeper.ZooKeeper.ZKWatchManager
中獲取儲存watcher的一個HashMap
:
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
org.apache.zookeeper.ZooKeeper.WatchRegistration#register
方法顯然是註冊一個watcher,該方法肯定會在後續流程得到呼叫,事實上在getData返回資料並且判斷成功後就會呼叫該方法將watcher加入到ZKWatchManager
中,我們稍後到了這一步流程在分析,這裡先有個大概的瞭解。
我們回到getData傳送請求的程式碼:
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
cnxn
的型別是org.apache.zookeeper.ClientCnxn
,進入到submitRequest方法:
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
先到此為止,等有空繼續完善原始碼分析部分(原始碼分析描述起來太麻煩了)。。。。。。。
相關推薦
[zookeeper]zookeeper系列三:zookeeper中watcher的使用及原理
watcher解決的問題 在進入watcher之前我們先試想在應用伺服器叢集中可能存在的兩個問題: 因為叢集中有很多機器,當某個通用的配置發生變化後,怎麼讓自動的讓所有伺服器的配置統一生效? 當叢集中某個節點宕機,如何讓叢集中的其他節點知道? 為了解決
selenium+testng+gitblit+jenkins+ant自動化測試系列三:本地git安裝及使用
第一步:下載本地git檔案:Git_2.11.1.exe 第二步:安裝Git_2.11.1,一路點選next預設安裝即可,安裝目錄為D:\Program Files (x86)\Git。 第三步:把
ZooKeeper系列之三:ZooKeeper的安裝
ZooKeeper的安裝模式分為三種,分別為:單機模式(stand-alone)、叢集模式和叢集偽分佈模式。ZooKeeper 單機模式的安裝相對比較簡單,如果第一次接觸ZooKeeper的話,建議安裝ZooKeeper單機模式或者叢集偽分佈模式。 1)單機模式 首先,
zookeeper系列之:zookeeper簡介淺談
一、zookeeper的定義 開啟zookeeper官網,赫然一行大字,寫著:“Apache ZooKeeper致力於開發和維護實現高度可靠的分散式協調的開源伺服器”。什麼意思呢?就是Apache ZooKeeper的目標是開發和維護開源伺服器,這伺服器是幹什麼的呢?是做分散式協調的。這伺服器的特點是什麼
HADOOP學習筆記總結三:zookeeper
在學習Hbase時,官方文件說hbase依賴於zookeeper來管理與跟蹤其分散式資料的狀態,hregionserver與hmaster都需要向它註冊。那什麼是zookeeper呢?今天學習一下: 1、zookeeper是什麼 ZooKeeper是一種分散式協調服務,用
[zookeeper]zookeeper系列七:zookeeper選舉及資料一致性
ZAB協議 ZAB(Zookeeper Atomic Broadcast)協議,即Zookeeper原子訊息廣播協議,協議內容大致如下: 所有事物的請求必須由全域性唯一的伺服器來協調處理,這樣的伺服器
ZooKeeper系列(8):ZooKeeper伸縮性
正是 測試 osal 網絡延遲 工作流 link 是我 為什麽 網絡 一、ZooKeeper中Observer 1.1 ZooKeeper角色 經過前面的介紹,我想大家都已經知道了在ZooKeeper集群當中有兩種角色Leader和Follower。Leader可以接受
ZooKeeper系列(9):ZooKeeper實現分布式Barrier和Queue
nod zookeeper instant zook conf protected tint 說了 this 1. 快速開始 1.1概述: Zookeeper是Hadoop的一個子項目,它是分布式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分布式同步、組服
C# 互操作性入門系列(三):平臺調用中的數據封送處理
ask rsh 整數 stat charset ron pan cell 被調用 好文章搬用工模式啟動ing 。。。。。 { 文章中已經包含了原文鏈接 就不再次粘貼了 言明 改文章是一個系列,但只收錄了2篇,原因是 夠用了 } -------------------
ZooKeeper入門(三) ZooKeeper數據模型
每次 con ges 可用 同文件 2.3 per 而是 創建時間 1 簡述 ZooKeeper可以看成一種高可用性的文件系統,但是,它沒有文件和目錄,而是使用節點,稱為znode。 znode可以作為保存數據的容器(如同文件),也可以作為保存其他節點的容器(如同目錄)。
Windows下USB磁碟開發系列三:列舉系統中U盤、並獲取其裝置資訊
前面我們介紹了列舉系統中的U盤碟符(見《Windows下USB磁碟開發系列一:列舉系統中U盤的碟符》)、以及獲取USB裝置的資訊(見《Windows下USB磁碟開發系列二:列舉系統中所有USB裝置》)。有個時候我們不僅僅需要獲取U盤碟符(路徑),而且需要獲取該U盤的硬體資訊,比如廠商、friendl
Zookeeper 原始碼(三)Zookeeper 客戶端原始碼
Zookeeper 原始碼(三)Zookeeper 客戶端原始碼 Zookeeper 客戶端由以下幾個核心元件組成: 類 說明 Zookeeper Zookeeper 客戶端入口 ClientWatch
mysql系列詳解三:mysql中各類日誌詳解-技術流ken
1.前言 日誌檔案記錄了MySQL資料庫的各種型別的活動,MySQL資料庫中常見的日誌檔案有 查詢日誌,慢查詢日誌,錯誤日誌,二進位制日誌,中繼日誌 。下面分別對他們進行介紹。 2.查詢日誌 1.檢視查詢日誌變數 查詢日誌即檢視日誌記錄了所有對 My
呼叫鏈系列三:解讀UAVStack中的呼叫鏈技術
本專題前幾篇文章主要從架構層面介紹瞭如何實現分散式呼叫追蹤系統。這篇文章我們不談架構,就其中的一項關鍵技術實現進行深入探討:如何從超文字傳輸協議(HTTP)中獲取request和response的body和header。 在Java中,HTTP協議的請求/響應模型是由Servlet規範+Servlet容
Spring Boot系列(三):Spring Boot中Redis的使用
spring boot對常用的資料庫支援外,對nosql 資料庫也進行了封裝自動化。 redis介紹 Redis是目前業界使用最廣泛的記憶體資料儲存。相比memcached,Redis支援更豐富的資料結構,例如hashes, lists, sets等,同時支援資料持久
深入理解JAVA集合系列三:HashMap的死循環解讀
現在 最新 star and 場景 所有 image cap 時也 由於在公司項目中偶爾會遇到HashMap死循環造成CPU100%,重啟後問題消失,隔一段時間又會反復出現。今天在這裏來仔細剖析下多線程情況下HashMap所帶來的問題: 1、多線程put操作後,get操作導
深入理解JAVA I/O系列三:字符流詳解
buffer 情況 二進制文件 感到 復制代碼 使用範圍 轉換 fileread 方式 字符流為何存在 既然字節流提供了能夠處理任何類型的輸入/輸出操作的功能,那為什麽還要存在字符流呢?容我慢慢道來,字節流不能直接操作Unicode字符,因為一個字符有兩個字節,字節流一次只
JAVA通信系列三:Netty入門總結
hand list code end @override ada 群發消息 -s object 一、Netty學習資料 書籍《Netty In Action中文版》 對於Netty的十一個疑問http://news.cnblogs.com/n/205413/ 深入淺出Net
sed修煉系列(四):sed中的疑難雜癥
chan 滑動 特殊符號 container 源文件 重復 target 情況 即使 本文目錄:1 sed中使用變量和變量替換的問題2 反向引用失效問題3 "-i"選項的文件保存問題4 貪婪匹配問題5 sed命令"a"和"N"的糾葛 1.sed中使用變量和變量替換的問題
Skype For Business 2015綜合部署系列三:配置Skype後端SQL數據服務器
sql server 2012 部署 skype for business sfb後端數據服務器部署 sql安裝配置 windows 安裝sql 2012 本篇博文進入Skype for business 2015 綜合部署系列的第三部分:配置Skype for business 201