Zookeeper(二)數據模型
Zookeeper數據模型ZNode
問題
-
ZK的數據模型ZNodes是什麽樣的:
樹形結構,每個葉子節點都存儲著數據,且可掛載子節點;
路徑作為葉子節點名稱,數據作為葉子節點內的數據;
-
Znode可以存儲什麽類型的數據
特性
-
watcher數據變更通知:客戶端在節點上設置監控,當節點發生變化時,會觸發監控,zk向客戶端發送通知
-
數據訪問:對存儲在命名空間的節點以原子方式讀取和寫入,每個節點都有一個訪問控制列表ACL
ACL(sechema? ?:id :?permision):
權限模式schema(IP,Digest,World,Super),
授權對象ID,
權限permission(CREATE,DELETE,READ,WRITE,ADMIN)
-
節點類型:
持久節點PERSISTENT,
持久順序節點PERSISTENT_SEQUENTIAL:按照創建先後順序添加數字後綴
臨時節點EPEMERAL:其生命周期與客戶端會話綁定,客戶端失效,節點被清除,且不能作為父節點
臨時順序節點EPEMERAL_SEQUENTIAL
-
版本:保證分布式數據原子性操作
每個節點都維護三種版本:
version數據內容版本號
cversion子節點版本號
aversionACL變更版本號
描述
- 路徑:以斜線分割
- 存儲空間:分層的命名空間,每個節點包含與之關聯的數據及子節點
- stat:每個znode維護一個stat結構,內包含數據更改的版本號(具體用於更新操作,類似悲觀鎖),acl更改,時間戳
ZKDatabase:類似數據庫
DataTree:數據庫內的結構-節點樹
ConcurrentHashMap<String, DataNode> nodes:節點樹上的節點集合
DataNode
DataNode是數據存儲的最小單元,其內部除了保存了結點的數據內容、ACL列表、節點狀態之外,還記錄了父節點的引用和子節點列表兩個屬性,其也提供了對子節點列表進行操作的接口。
public class DataNode implements Record { //節點數據 byte data[]; //節點的acl的映射;dataTree上另存map Long acl; //節點持久化在磁盤的狀態 public StatPersisted stat; //該節點的子節點集合 private Set<String> children = null; }
DataTree
public class DataTree {
//節點集合:k-路徑,v-節點
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
//數據監控:內部維護監控集合
private IWatchManager dataWatches;
//子節點監控
private IWatchManager childWatches;
//一個會話中短命的節點????
private final Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
}
ZKDatabase
zookeeper的內存數據庫,管理zookeeper的所有會話,dataTree存儲和事務日誌,它會定時向磁盤dump快照數據,同時在zk啟動時,會通過磁盤的事務日誌和快照文件恢復成一個完整的數據庫
public class ZKDatabase {
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//快照日誌:一個database一個snaplog
protected FileTxnSnapLog snapLog;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
}
基本操作
- 創建節點:聲明節點存儲路徑,節點存儲模式(持久化/臨時)
- 獲取節點
- 獲取子節點
- 更改節點並觸發watcher監控:會向客戶端發送通知,客戶端線程從WatcherManager中取出對應的Watcher對象來執行回調邏輯
public class ZKTest{
//聲明zk客戶端
private ZooKeeper zookeeper;
//數據存儲根路徑
private final String dir;
//數據訪問權限列表
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
//節點存儲模式:默認持久
private CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;
public ZKTest(ZooKeeper zookeeper, String dir, List<ACL> acl){
this.dir = dir;
if(acl != null){
this.acl = acl;
}
this.zookeeper = zookeeper;
}
// 1.創建節點
public boolean addNode(byte[] data){
try{
zookeeper.create(dir+"/"+prefix, data, acl, createMode);
return true;
}catch(KeeperException.NoNodeException e){
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
}
}
// 2.獲取節點
public byte[] getData(){
return zookeeper.getData(dir, false, null);
}
// 3.獲取路徑下的子節點
public List<String> getChildren(){
return childNames = zookeeper.getChildren(dir, watcher);
}
}
public class Zookeeper{
//根據給定的路徑path,訪問權限acl,存儲模式createMode等創建節點
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException{
final String clientPath = path;
//校驗路徑 且是否允許節點已存在 如果已存在路徑名稱+1 否就覆蓋
PathUtils.validatePath(clientPath, createMode.isSequential());
//根據createMode辨別如何創建節點
EphemeralType.validateTTL(createMode, -1);
//校驗acl列表是否為空
validateACL(acl);
//將chroot前置到clientPath
final String serverPath = prependChroot(clientPath);
//聲明請求頭 為請求服務端創建節點做準備
RequestHeader h = new RequestHeader();
h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
request.setAcl(acl);
//調用客戶端ClientCnxn提交請求:TODO
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
// 2.獲取節點
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
//watch true:將watcher留在節點上(沒有報錯的情況下)
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
//設置請求類型
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
//將服務端返回的狀態信息賦值到stat上 ????有什麽用
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
//將watcher註冊到某節點路徑上
public abstract class WatchRegistration {
private Watcher watcher;
private String clientPath;
}
// 3.獲取路徑下的子節點 和getData大致相同
....
}
Zookeeper(二)數據模型