ZooKeeper watcher和version
ZooKeeper的用途:distributed coordination;maintaining configuration information, naming, providing distributed synchronization, and providing group services.
Zookeeper的節點都是存放在記憶體中的,所以讀寫速度很快。更新日誌被記錄到了磁碟中,以便用於恢復資料。在更新內在中節點數之前,會先序列化到磁碟中。
為避免單點失效,zookeeper的資料是在多個server上留有備份的。不管客戶端連線到的是哪個server,它看到的資料都是一致的
所有的server 都必須知道彼此的存在。
zookeeper在讀寫比例為10:1時效能最佳。
每個znode上data的讀寫都是原子操作。
讀是區域性性的,即client只需要從與它相連的server上讀取資料即可;而client有寫請求的話,與之相連的server會通知leader,然後leader會把寫操作分發給所有server。所以定要比讀慢很多,leader負責寫操作以及分發,速度較慢。
在建立zookeeper連線時,給定的地址字串可以是這樣的:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a",以後的所有操作就都是在/app/a下進行的。
當client與一個server斷連線時(可能是因為server失效了),它就收不到任何watches;當它與另一個server建立好連線後,它就會收到"session expired"通知。
ACL不是遞迴的,它只針對當前節點,對子節點沒有任何影響。
預設情況下日誌檔案和資料檔案是放在同一個目錄下的,為縮短延遲提高響應性,你可以把日誌檔案單獨放在另一個目錄下。
為避免swaping,執行java時最好把可用物理內在調得大一些,比如對於4G的內在,可以把它調到3G。java有以下兩個執行引數:
-Xms<size>
設定虛擬機器可用記憶體堆的初始大小,預設單位為位元組,該大小為1024的整數倍並且要大於1MB,可用k(K)或m(M)為單位來設定較大的記憶體數。初始堆大小為2MB。
例如:-Xms6400K,-Xms256M
-Xmx<size>
設定虛擬機器記憶體堆的最大可用大小,預設單位為位元組。該值必須為1024整數倍,並且要大於2MB。可用k(K)或m(M)為單位來設定較大的記憶體數。預設堆最大值為64MB。
例如:-Xmx81920K,-Xmx80M
CreateMode
PERSISTENT:建立後只要不刪就永久存在
EPHEMERAL:會話結束年結點自動被刪除
SEQUENTIAL:節點名末尾會自動追加一個10位數的單調遞增的序號,同一個節點的所有子節點序號是單調遞增的 ,例如:node-0000000002
PERSISTENT_SEQUENTIAL:結合PERSISTENT和SEQUENTIAL
EPHEMERAL_SEQUENTIAL:結合EPHEMERAL和SEQUENTIAL
package basic;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class Demo {
private static final int TIMEOUT = 3000;
public static void main(String[] args) throws IOException {
ZooKeeper zkp = new ZooKeeper("localhost:2181", TIMEOUT, null);
try {
// 建立一個EPHEMERAL型別的節點,會話關閉後它會自動被刪除
zkp.create("/node1", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
if (zkp.exists("/node1", false) != null) {
System.out.println("node1 exists now.");
}
try {
// 當節點名已存在時再去建立它會丟擲KeeperException(即使本次的ACL、CreateMode和上次的不一樣)
zkp.create("/node1", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
} catch (KeeperException e) {
System.out.println("KeeperException caught:" + e.getMessage());
}
// 關閉會話
zkp.close();
zkp = new ZooKeeper("localhost:2181", TIMEOUT, null);
//重新建立會話後node1已經不存在了
if (zkp.exists("/node1", false) == null) {
System.out.println("node1 dosn't exists now.");
}
//建立SEQUENTIAL節點
zkp.create("/node-", "same data".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
zkp.create("/node-", "same data".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
zkp.create("/node-", "same data".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
List<String> children = zkp.getChildren("/", null);
System.out.println("Children of root node:");
for (String child : children) {
System.out.println(child);
}
zkp.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
第一次執行輸出:
node1 exists now.
KeeperException caught:KeeperErrorCode = NodeExists for /node1
node1 dosn't exists now.
Children of root node:
node-0000000003
zookeeper
node-0000000002
node-0000000001
第二次執行輸出:
node1 exists now.
KeeperException caught:KeeperErrorCode = NodeExists for /node1
node1 dosn't exists now.
Children of root node:
node-0000000003
zookeeper
node-0000000002
node-0000000001
node-0000000007
node-0000000005
node-0000000006
注意兩次會話中建立的PERSISTENT_SEQUENTIAL節點序號並不是連續的,比如上例中缺少了node-0000000004.
Watcher & Version
watcher分為兩大類:data watches和child watches。getData()和exists()上可以設定data watches,getChildren()上可以設定child watches。
setData()會觸發data watches;
create()會觸發data watches和child watches;
delete()會觸發data watches和child watches.
如果對一個不存在的節點呼叫了exists(),並設定了watcher,而在連線斷開的情況下create/delete了該znode,則watcher會丟失。
在server端用一個map來存放watcher,所以相同的watcher在map中只會出現一次,只要watcher被回撥一次,它就會被刪除----map解釋了watcher的一次性。比如如果在getData()和exists()上設定的是同一個data watcher,呼叫setData()會觸發data watcher,但是getData()和exists()只有一個會收到通知。
1 import java.io.IOException;
2
3 import org.apache.zookeeper.CreateMode;
4 import org.apache.zookeeper.KeeperException;
5 import org.apache.zookeeper.WatchedEvent;
6 import org.apache.zookeeper.Watcher;
7 import org.apache.zookeeper.ZooDefs.Ids;
8 import org.apache.zookeeper.ZooKeeper;
9 import org.apache.zookeeper.data.Stat;
10
11 public class SelfWatcher implements Watcher{
12
13 ZooKeeper zk=null;
14
15 @Override
16 public void process(WatchedEvent event) {
17 System.out.println(event.toString());
18 }
19
20 SelfWatcher(String address){
21 try{
22 zk=new ZooKeeper(address,3000,this); //在建立ZooKeeper時第三個引數負責設定該類的預設建構函式,連線成功會觸發
23 zk.create("/root", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
24 }catch(IOException e){
25 e.printStackTrace();
26 zk=null;
27 }catch (KeeperException e) {
28 e.printStackTrace();
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32 }
33
34 void setWatcher(){
35 try {
36 Stat s=zk.exists("/root", true); //設定了一個watcher
37 if(s!=null){
38 zk.getData("/root", false, s);
39 }
40 } catch (KeeperException e) {
41 e.printStackTrace();
42 } catch (InterruptedException e) {
43 e.printStackTrace();
44 }
45 }
46
47 void trigeWatcher(){
48 try {
49 Stat s=zk.exists("/root", false); //此處不設定watcher
50 zk.setData("/root", "a".getBytes(), s.getVersion()); //修改資料時需要提供version,所以首先exists獲取stat,然後呼叫setData修改資料
51 }catch(Exception e){
52 e.printStackTrace();
53 }
54 }
55
56 void disconnect(){
57 if(zk!=null)
58 try {
59 zk.close();
60 } catch (InterruptedException e) {
61 e.printStackTrace();
62 }
63 }
64
65 public static void main(String[] args){
66 SelfWatcher inst=new SelfWatcher("127.0.0.1:2181");
67 inst.setWatcher();
68 inst.trigeWatcher();
69 inst.disconnect();
70 }
71
72 }
可以在建立Zookeeper時指定預設的watcher回撥函式,這樣在getData()、exists()和getChildren()收到通知時都會呼叫這個函式--只要它們在引數中設定了true。所以如果把程式碼22行的this改為null,則不會有任何watcher被註冊。
上面的程式碼輸出:
WatchedEvent state:SyncConnected type:None path:null
WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
之所會輸出第1 行是因為本身在建立ZooKeeper連線時就會觸發watcher。輸出每二行是因為在程式碼的第36行設定了true。
WatchEvent有三種類型:NodeDataChanged、NodeDeleted和NodeChildrenChanged。
呼叫setData()時會觸發NodeDataChanged;
呼叫create()時會觸發NodeDataChanged和NodeChildrenChanged;
呼叫delete()時上述三種event都會觸發。
如果把程式碼的第36--39行改為:
Stat s=zk.exists("/root", false); if(s!=null){ zk.getData("/root", true, s); }
或
Stat s=zk.exists("/root", true); if(s!=null){ zk.getData("/root", true, s); }
跟上面的輸出是一樣的。這也證明了watcher是一次性的。註冊的為同一個watcher,在setData時會觸發一次通知。
設定watcher的另外一種方式是不使用預設的watcher,而是在getData()、exists()和getChildren()中指定各自的watcher。示例程式碼如下:
1 public class SelfWatcher{ 2 3 ZooKeeper zk=null; 4 5 private Watcher getWatcher(final String msg){ 6 return new Watcher(){ 7 @Override 8 public void process(WatchedEvent event) { 9 System.out.println(msg+"\t"+event.toString()); 10 } 11 }; 12 } 13 14 SelfWatcher(String address){ 15 try{ 16 zk=new ZooKeeper(address,3000,null); //在建立ZooKeeper時第三個引數負責設定該類的預設建構函式 17 zk.create("/root", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 18 }catch(IOException e){ 19 e.printStackTrace(); 20 zk=null; 21 }catch (KeeperException e) { 22 e.printStackTrace(); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 } 27 28 void setWatcher(){ 29 try { 30 Stat s=zk.exists("/root", getWatcher("EXISTS")); 31 if(s!=null){ 32 zk.getData("/root", getWatcher("GETDATA"), s); 33 } 34 } catch (KeeperException e) { 35 e.printStackTrace(); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } 39 } 40 41 void trigeWatcher(){ 42 try { 43 Stat s=zk.exists("/root", false); //此處不設定watcher 44 zk.setData("/root", "a".getBytes(), s.getVersion()); 45 }catch(Exception e){ 46 e.printStackTrace(); 47 } 48 } 49 50 void disconnect(){ 51 if(zk!=null) 52 try { 53 zk.close(); 54 } catch (InterruptedException e) { 55 e.printStackTrace(); 56 } 57 } 58 59 public static void main(String[] args){ 60 SelfWatcher inst=new SelfWatcher("127.0.0.1:2181"); 61 inst.setWatcher(); 62 inst.trigeWatcher(); 63 inst.disconnect(); 64 } 65 66 }
輸出:
GETDATA WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
EXISTS WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
由於在exists()和getData()中都呼叫了getWatcher(),產生兩個Watcher例項放在了map中,所以exists()和getData()都會收到通知。由於16行建立Zookeeper時沒有設定watcher(引數為null),所以建立連線時沒有收到通知。並且這個兩個watcher收到通知的順序不一定。
關於Version:為了方便進行cache validations 和coordinated updates,每個znode都有一個stat結構體,其中包含:version的更改記錄、ACL的更改記錄、時間戳。znode的資料每更改一次,version就會加1。客戶端每次檢索data的時候都會把data的version一併讀出出來。修改資料時需要提供version。
zk.delete("/root", -1); //觸發data watches和children watches zk.getChildren("/root", getWatcher("LISTCHILDREN")); //getChildren()上可以設定children watches
輸出:
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
zk.delete("/root", -1); //觸發data watches和children watches Stat s=zk.exists("/root", getWatcher("EXISTS")); //exists()上可以設定data watches if(s!=null){ zk.getChildren("/root", getWatcher("LISTCHILDREN")); }
輸出:
EXISTS WatchedEvent state:SyncConnected type:NodeDeleted path:/root
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
zk.delete("/root", -1); //觸發data watches和children watches Stat s=zk.exists("/root", getWatcher("EXISTS")); if(s!=null){ zk.getData("/root", getWatcher("GETDATA"), s); zk.getChildren("/root", getWatcher("LISTCHILDREN")); }
輸出:
GETDATA WatchedEvent state:SyncConnected type:NodeDeleted path:/root
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
EXISTS WatchedEvent state:SyncConnected type:NodeDeleted path:/root
tat s=zk.exists("/root", false); zk.setData("/root", "a".getBytes(), s.getVersion()); zk.delete("/root", -1); Stat s=zk.exists("/root", getWatcher("EXISTS")); if(s!=null){ zk.getData("/root", getWatcher("GETDATA"), s); zk.getChildren("/root", getWatcher("LISTCHILDREN")); }
輸出:
GETDATA WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
EXISTS WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
按說data watches觸發了兩次,但是exists()和getData()只會收到一次通知。