分散式協調-Zookeeper使用(Watcher、Curator、Session、Acl)
分散式協調-Zookeeper使用(Watcher、Session、Curator、Acl)
前面說到zk可以為shardingSphere當做動態配置的一箇中間件,然後聊了一下zk的大體介紹,本篇咱們聊聊他的一些常見的特性,並且對其進行相關闡釋,同時使用Curator作為Demo。本篇會聊到:
- 【State】:zk上每個節點除了儲存了節點資料,同時也儲存了一些節點的狀態資訊。我們會分析一下。
- 【Watcher(釋出訂閱)】:在shardSphere使用zk作為分散式配置的那一篇,我們在zk上手動修改了配置,然後ShardingSphere就可以感知到,其底層就是基於這個特性來做的。
- 【Session】:客戶端和session建立連線的流程。
- 【Acl】(許可權控制):因為不是誰都有許可權對zk上的node進行操作的,一旦操作不當,系統就可能宕機。
- 【Curator】:對zk的api封裝的一個客戶端框架。
Stat(每個節點的狀態和資訊)
czxid 表示該資料節點被建立時的事務ID mzxid 表示該節點最後一次被更新時的事務ID ctime 表示節點被建立的時間 mtime 表示該節點最後一次被更新的時間 version 資料節點的版本號,這裡其實是一種樂觀鎖。我們每次修改一個節點資料的時候,節點的version就會增加。那每個客戶端在修改節點的時候,帶一個version,當他們傳遞的version和當前version不一致的時候,就修改失敗。 cversion 子結點的版本號 aversion 節點的ACL版本號 ephemeralOwner 建立該臨時節點的會話的sessionID。如果該節點是持久節點,那麼這個屬性值為0 dataLength 資料內容的長度 numChildren 當前節點的子節點個數 pzxid 表示該節點的子節點裡最後一次被修改時的事務ID。 【version】:像上面解釋的一樣,現在我們的版本seq的版本是2,我們修改時帶上版本是1就無法修改。
Watcher
我們的shardingSphere使用zk作為配置中心,當在zk上修改了配置後,shardingSphere就能感知到,就是通過watcher做的,zk實際上是和我們的客戶端建立連線,並且主動通知客戶端有資料修改了
。我們這裡舉個例子。對某個節點建立一個監聽:所有命令帶w的都是可以進行監聽的
比如:我們在get的時候對某個節點進行監聽,那麼當其他客戶端對我們get的這個資料進行操作的時候,我們對這個節點監聽的節點就會收到訊息。
現在我們使用客戶端1對seq這個節點get的時候進行監聽
然後使用客戶端2對這個節點的值進行修改
這個時候節點一就能收到被監聽節點的修改資訊
問題是當我們再次這個資料進行修改的時候,修改的資訊並沒有被監聽到,這也就是說,這種方式只是一次性監聽。那如何進行每次修改都被監聽到呢?
- 迴圈監聽:我們看到,當收到資訊時候,我們是可以知道哪個節點被修改了,那我們就可以拿到這個節點再次進行監聽。
- addwatch:我們發現它命令中有一個addwatch,這個就是實現持續監聽的方法。
持久化監聽裡面提供了兩種方式:
【PERSISTENT】:持久化訂閱,針對當前節點的修改和刪除事件,以及當前節點的子節點的刪除和新增事件 【PERSISTENT_RECURSIVE】:持久化遞迴訂閱,在PERSISTENT的基礎上,增加了子節點修改的事件觸發,以及子節點的子節點的資料變化都會觸發相關事件(滿足遞迴訂閱特性) 那瞭解了這個特性就知道,我們的shardingSphere指定是對rules(儲存他的配置檔案的及節點)進行了addwather的監聽,這樣當我們修改了rules的資料,監聽這個節點的那些個shardingSphere就收到了資訊啦。
Session(當客戶端連線zkServer的時候,是一個非同步的狀態.)
- 客戶端向Zookeeper Server發起連線請求,此時狀態為CONNECTING
- 當連線建立好之後,Session狀態轉化為CONNECTED,此時可以進行資料的IO操作
- 如果Client和Server的連接出現丟失,則Client又會變成CONNECTING狀態
- 如果會話過期或者主動關閉連線時,此時連線狀態為CLOSE
- 如果是身份驗證失敗,直接結束
Curator(使用java操作zk)
pom
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency>View Code以下的程式碼,就是對zk進行操作的一個案例,包含了增刪改查。curator提供了兩種方式,同步和非同步的操作。
【增刪改查同步操作】
// 啟動以及連線zk private CuratorOperationExample(){ curatorFramework= CuratorFrameworkFactory .builder() .connectionTimeoutMs(20000) .connectString("192.168.43.3:2181") //讀寫分離(zookeeper-server) .retryPolicy(new ExponentialBackoffRetry(1000,3)) .sessionTimeoutMs(15000) .build(); curatorFramework.start(); //啟動 } // 對節點進行操作 private void nodeCRUD() throws Exception { System.out.println("開始針對節點的CRUD操作"); //這個要給節點中儲存的資料 String value="Hello World"; //建立一個節點 String node=curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/node",value.getBytes()); System.out.println("節點建立成功:"+node); //儲存狀態資訊的物件 Stat stat=new Stat(); //獲取節點的value byte[] data=curatorFramework.getData().storingStatIn(stat).forPath(node); System.out.println("節點value值:"+new String(data)); //這裡使用先查詢,然後修改,因為可能有多個客戶端同時連線。可能存在鎖的問題。這裡查詢出來version然後再進行操作 stat=curatorFramework.setData() .withVersion(stat.getVersion()) .forPath(node,"Update Date Result".getBytes()); String result=new String(curatorFramework.getData().forPath(node)); System.out.println("修改節點之後的資料:"+result); System.out.println("開始刪除節點"); curatorFramework.delete().forPath(node); Stat existStat=curatorFramework.checkExists().forPath(node); if(existStat==null){ System.out.println("節點刪除成功"); } }View Code【非同步增操作】
// 建立一個節點的非同步方式,其他的操作都有對應的api, // 這裡的CountDownLatch是為了不讓執行緒直接跑下去, // 要是直接跑下去的話,就看不到建立的節點了,因為是非同步的,而當執行到獲取節點的時候,可能還沒有建立好,只是為了看見這個節點內容而已,沒有任何作用。 public void asyncCRUD() throws Exception { CountDownLatch countDownLatch=new CountDownLatch(1); // ZK會回撥BackgroundCallback裡面的方法進行回撥 String node=curatorFramework.create().withMode(CreateMode.PERSISTENT) .inBackground((session,event)->{ System.out.println(Thread.currentThread().getName()+":執行建立節點:"+event.getPath()); countDownLatch.countDown(); //觸發回撥,遞減計數器 }).forPath("/async-node"); countDownLatch.await(); }View Code【Acl操作:digest】digest為例因為他是對於每次會話的授權,那我們在建立連線的時候就要加上,【.authorization("digest","glen:glen".getBytes())】
//創醬一個acl節點 //scheme 是digest id是glen:glen 許可權是ala private void aclOperation() throws Exception { Id id=new Id("digest", DigestAuthenticationProvider.generateDigest("glen:glen")); List<ACL> acls=new ArrayList<>(); acls.add(new ACL(ZooDefs.Perms.ALL,id)); String node=curatorFramework.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls,false).forPath("/curator-auth","Auth".getBytes()); System.out.println("建立帶有許可權節點:"+node); System.out.println("資料查詢結果:"+new String(curatorFramework.getData().forPath(node))); }View Code【一次性監聽】
// 一次性監聽 public void normalWatcher() throws Exception { CuratorWatcher curatorWatcher=new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { System.out.println("監聽到的事件"+watchedEvent.toString()); //迴圈設定監聽 curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath()); } }; // 創醬一個節點 String node=curatorFramework.create().forPath("/watcher","Watcher String".getBytes()); System.out.println("節點建立成功:"+node); //設定一次普通的watcher監聽 String data=new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node)); System.out.println("設定監聽並獲取節點資料:"+data); //第一次操作才會觸發監聽,而第二次不會。所以上面在回撥方法中我們設定了一個迴圈監聽。 curatorFramework.setData().forPath(node,"change data 0".getBytes()); Thread.sleep(1000); curatorFramework.setData().forPath(node,"change data 1".getBytes()); }View Code【持續化監聽】
//持久化監聽 //node :要監聽的節點名稱 private void persisWatcher(String node){ CuratorCache curatorCache=CuratorCache. //例項;去監聽的節點;操作型別(單節點快取,對資料進行壓縮,關閉或不清理快取) build(curatorFramework,node, CuratorCache.Options.SINGLE_NODE_CACHE); //這裡可以設定對於事件的監聽型別 CuratorCacheListener listener=CuratorCacheListener .builder() //這裡是我們自己寫的一個類,他會呼叫我們類的方法,這個實現了他的CuratorCacheListener介面 .forAll(new ZookeeperWatcherListener()) .build(); //把事件監聽新增進去 curatorCache.listenable().addListener(listener); curatorCache.start(); }View Code【測試】
//這裡對節點進行操作,看看是不是會別檢測到事件 private void operation(String node) throws Exception { curatorFramework.create().forPath(node); curatorFramework.setData().forPath(node,"hello".getBytes()); curatorFramework.delete().forPath(node); } public static void main(String[] args) throws Exception { ZookeeperWatchExample zookeeperWatchExample=new ZookeeperWatchExample(); String node="/persis-node"; zookeeperWatchExample.persisWatcher(node); zookeeperWatchExample.operation(node); //讓main方法等待 System.in.read(); }View Code
ACL許可權控制
zk針對節點提供了許可權的控制,這是因為要規避有人不小心刪除了某個節點,而導致整個系統出現問題的情況。他和linux的許可權控制相似。
他的許可權標誌符是這樣的:【scheme:id:perm】
比如我們獲取seq節點,他的scheme就是world(全部都能訪問),id就是所有人,permission就是增、刪、改、查、管理
- Scheme(許可權模式),標識授權策略,即表示通過什麼樣子的方式去控制權限。
- 【world】:預設方式,相當於全部都能訪問。
- 【auth】:代表已經認證通過的使用者(cli中可以通過addauth digest user:pwd 來添加當前上下文中的授權使用者)
- 【digest】:即使用者名稱:密碼這種方式認證
- 【ip】:通過ip地址來做許可權控制。
- ID(授權物件):比如說我們的scheme是ip,那這裡就填寫ip,如果是digest,那就填寫使用者名稱和密碼
- Permission:授予的許可權 (c) create . (d)delete (r)read (w)write (a)admin
【world】通過get和set【acl】命令去修改一個節點的許可權,一個world的例子。
【auth】 對登入使用者進行授權,授權後可以操作授權的節點,而退出客戶端後,再次進入就需要再次授權了。前面的glen是使用者名稱 後面的glen是密碼
當退出後,我們就無法操作atuh這個節點,必須再次進行授權了。