1. 程式人生 > 其它 >分散式協調-Zookeeper使用(Watcher、Curator、Session、Acl)

分散式協調-Zookeeper使用(Watcher、Curator、Session、Acl)

分散式協調-Zookeeper使用(Watcher、Session、Curator、Acl)

前面說到zk可以為shardingSphere當做動態配置的一箇中間件,然後聊了一下zk的大體介紹,本篇咱們聊聊他的一些常見的特性,並且對其進行相關闡釋,同時使用Curator作為Demo。本篇會聊到:

  • 【State】:zk上每個節點除了儲存了節點資料,同時也儲存了一些節點的狀態資訊。我們會分析一下。
  • 【Watcher(釋出訂閱)】:在shardSphere使用zk作為分散式配置的那一篇,我們在zk上手動修改了配置,然後ShardingSphere就可以感知到,其底層就是基於這個特性來做的。
  • 【Session】:客戶端和session建立連線的流程。
  • 【Acl】(許可權控制):因為不是誰都有許可權對zk上的node進行操作的,一旦操作不當,系統就可能宕機。
  • 【Curator】:對zk的api封裝的一個客戶端框架。

Stat(每個節點的狀態和資訊)

czxid 表示該資料節點被建立時的事務ID
mzxid 表示該節點最後一次被更新時的事務ID
ctime 表示節點被建立的時間
mtime 表示該節點最後一次被更新的時間
version 資料節點的版本號,這裡其實是一種樂觀鎖。我們每次修改一個節點資料的時候,節點的version就會增加。那每個客戶端在修改節點的時候,帶一個version,當他們傳遞的version和當前version不一致的時候,就修改失敗。
cversion 子結點的版本號
aversion 節點的ACL版本號
ephemeralOwner 建立該臨時節點的會話的sessionID。如果該節點是持久節點,那麼這個屬性值為0
dataLength 資料內容的長度
numChildren 當前節點的子節點個數
pzxid 表示該節點的子節點裡最後一次被修改時的事務ID。

version】:像上面解釋的一樣,現在我們的版本seq的版本是2,我們修改時帶上版本是1就無法修改。

Watcher

我們的shardingSphere使用zk作為配置中心,當在zk上修改了配置後,shardingSphere就能感知到,就是通過watcher做的,zk實際上是和我們的客戶端建立連線,並且主動通知客戶端有資料修改了

。我們這裡舉個例子。

對某個節點建立一個監聽:所有命令帶w的都是可以進行監聽的

比如:我們在get的時候對某個節點進行監聽,那麼當其他客戶端對我們get的這個資料進行操作的時候,我們對這個節點監聽的節點就會收到訊息。

現在我們使用客戶端1對seq這個節點get的時候進行監聽

然後使用客戶端2對這個節點的值進行修改

這個時候節點一就能收到被監聽節點的修改資訊

問題是當我們再次這個資料進行修改的時候,修改的資訊並沒有被監聽到,這也就是說,這種方式只是一次性監聽。那如何進行每次修改都被監聽到呢?

  • 迴圈監聽:我們看到,當收到資訊時候,我們是可以知道哪個節點被修改了,那我們就可以拿到這個節點再次進行監聽。
  • addwatch:我們發現它命令中有一個addwatch,這個就是實現持續監聽的方法。

 

持久化監聽裡面提供了兩種方式:

【PERSISTENT】:持久化訂閱,針對當前節點的修改和刪除事件,以及當前節點的子節點的刪除和新增事件 【PERSISTENT_RECURSIVE】:持久化遞迴訂閱,在PERSISTENT的基礎上,增加了子節點修改的事件觸發,以及子節點的子節點的資料變化都會觸發相關事件(滿足遞迴訂閱特性) 那瞭解了這個特性就知道,我們的shardingSphere指定是對rules(儲存他的配置檔案的及節點)進行了addwather的監聽,這樣當我們修改了rules的資料,監聽這個節點的那些個shardingSphere就收到了資訊啦。

Session(當客戶端連線zkServer的時候,是一個非同步的狀態.)

  • 客戶端向Zookeeper Server發起連線請求,此時狀態為CONNECTING
  • 當連線建立好之後,Session狀態轉化為CONNECTED,此時可以進行資料的IO操作
  • 如果Client和Server的連接出現丟失,則Client又會變成CONNECTING狀態
  • 如果會話過期或者主動關閉連線時,此時連線狀態為CLOSE
  • 如果是身份驗證失敗,直接結束

Curator(使用java操作zk)

pom

    <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
View Code

以下的程式碼,就是對zk進行操作的一個案例,包含了增刪改查。curator提供了兩種方式,同步和非同步的操作。

【增刪改查同步操作】

   // 啟動以及連線zk
    private CuratorOperationExample(){
        curatorFramework= CuratorFrameworkFactory
                .builder()
                .connectionTimeoutMs(20000)
                .connectString("192.168.43.3:2181") //讀寫分離(zookeeper-server)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .sessionTimeoutMs(15000)
                .build();
        curatorFramework.start(); //啟動
    }

    // 對節點進行操作
    private void nodeCRUD() throws Exception {
        System.out.println("開始針對節點的CRUD操作");
        //這個要給節點中儲存的資料
        String value="Hello World";
        //建立一個節點
        String node=curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/node",value.getBytes());
        System.out.println("節點建立成功:"+node);
        //儲存狀態資訊的物件
        Stat stat=new Stat();
        //獲取節點的value
        byte[] data=curatorFramework.getData().storingStatIn(stat).forPath(node);
        System.out.println("節點value值:"+new String(data));
        //這裡使用先查詢,然後修改,因為可能有多個客戶端同時連線。可能存在鎖的問題。這裡查詢出來version然後再進行操作
        stat=curatorFramework.setData()
                .withVersion(stat.getVersion())
                .forPath(node,"Update Date Result".getBytes());
        String result=new String(curatorFramework.getData().forPath(node));
        System.out.println("修改節點之後的資料:"+result);
        System.out.println("開始刪除節點");
        curatorFramework.delete().forPath(node);
        Stat existStat=curatorFramework.checkExists().forPath(node);
        if(existStat==null){
            System.out.println("節點刪除成功");
        }
    }
View Code

【非同步增操作】

    // 建立一個節點的非同步方式,其他的操作都有對應的api,
    // 這裡的CountDownLatch是為了不讓執行緒直接跑下去,
    // 要是直接跑下去的話,就看不到建立的節點了,因為是非同步的,而當執行到獲取節點的時候,可能還沒有建立好,只是為了看見這個節點內容而已,沒有任何作用。
    public void asyncCRUD() throws Exception {
        CountDownLatch countDownLatch=new CountDownLatch(1);
        // ZK會回撥BackgroundCallback裡面的方法進行回撥
        String node=curatorFramework.create().withMode(CreateMode.PERSISTENT)
                .inBackground((session,event)->{
            System.out.println(Thread.currentThread().getName()+":執行建立節點:"+event.getPath());
            countDownLatch.countDown(); //觸發回撥,遞減計數器
        }).forPath("/async-node");
        countDownLatch.await();
    }
View Code

【Acl操作:digest】digest為例因為他是對於每次會話的授權,那我們在建立連線的時候就要加上,【.authorization("digest","glen:glen".getBytes())】

    //創醬一個acl節點
    //scheme 是digest id是glen:glen 許可權是ala
    private void aclOperation() throws Exception {
        Id id=new Id("digest", DigestAuthenticationProvider.generateDigest("glen:glen"));
        List<ACL> acls=new ArrayList<>();
        acls.add(new ACL(ZooDefs.Perms.ALL,id));
        String node=curatorFramework.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(acls,false).forPath("/curator-auth","Auth".getBytes());
        System.out.println("建立帶有許可權節點:"+node);
        System.out.println("資料查詢結果:"+new String(curatorFramework.getData().forPath(node)));
    }
View Code

一次性監聽

    // 一次性監聽
    public void normalWatcher() throws Exception {
        CuratorWatcher curatorWatcher=new CuratorWatcher() {
            @Override
            public void process(WatchedEvent watchedEvent) throws Exception {
                System.out.println("監聽到的事件"+watchedEvent.toString());
                //迴圈設定監聽
                curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath());
            }
        };
        // 創醬一個節點
        String node=curatorFramework.create().forPath("/watcher","Watcher String".getBytes());
        System.out.println("節點建立成功:"+node);
        //設定一次普通的watcher監聽
        String data=new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node));
        System.out.println("設定監聽並獲取節點資料:"+data);
        //第一次操作才會觸發監聽,而第二次不會。所以上面在回撥方法中我們設定了一個迴圈監聽。
        curatorFramework.setData().forPath(node,"change data 0".getBytes());
        Thread.sleep(1000);
        curatorFramework.setData().forPath(node,"change data 1".getBytes());
    }
View Code

持續化監聽

    //持久化監聽
    //node :要監聽的節點名稱
    private void persisWatcher(String node){
        CuratorCache curatorCache=CuratorCache.
                //例項;去監聽的節點;操作型別(單節點快取,對資料進行壓縮,關閉或不清理快取)
                build(curatorFramework,node, CuratorCache.Options.SINGLE_NODE_CACHE);
        //這裡可以設定對於事件的監聽型別
        CuratorCacheListener listener=CuratorCacheListener
                .builder()
                //這裡是我們自己寫的一個類,他會呼叫我們類的方法,這個實現了他的CuratorCacheListener介面
                .forAll(new ZookeeperWatcherListener())
                .build();
        //把事件監聽新增進去
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
    }
View Code

測試

//這裡對節點進行操作,看看是不是會別檢測到事件
    private void operation(String node) throws Exception {
        curatorFramework.create().forPath(node);
        curatorFramework.setData().forPath(node,"hello".getBytes());
        curatorFramework.delete().forPath(node);
    }

    public static void main(String[] args) throws Exception {
        ZookeeperWatchExample zookeeperWatchExample=new ZookeeperWatchExample();
        String node="/persis-node";
        zookeeperWatchExample.persisWatcher(node);
        zookeeperWatchExample.operation(node);
        //讓main方法等待
        System.in.read();
    }
View Code

ACL許可權控制

zk針對節點提供了許可權的控制,這是因為要規避有人不小心刪除了某個節點,而導致整個系統出現問題的情況。他和linux的許可權控制相似。

他的許可權標誌符是這樣的:【scheme:id:perm

比如我們獲取seq節點,他的scheme就是world(全部都能訪問),id就是所有人,permission就是增、刪、改、查、管理

  • Scheme(許可權模式),標識授權策略,即表示通過什麼樣子的方式去控制權限。
    • 【world】:預設方式,相當於全部都能訪問。
    • 【auth】:代表已經認證通過的使用者(cli中可以通過addauth digest user:pwd 來添加當前上下文中的授權使用者)
    • 【digest】:即使用者名稱:密碼這種方式認證
    • 【ip】:通過ip地址來做許可權控制。
  • ID(授權物件):比如說我們的scheme是ip,那這裡就填寫ip,如果是digest,那就填寫使用者名稱和密碼
  • Permission:授予的許可權 (c) create . (d)delete (r)read (w)write (a)admin

【world】通過get和set【acl】命令去修改一個節點的許可權,一個world的例子。

auth 對登入使用者進行授權,授權後可以操作授權的節點,而退出客戶端後,再次進入就需要再次授權了。前面的glen是使用者名稱 後面的glen是密碼

當退出後,我們就無法操作atuh這個節點,必須再次進行授權了。