1. 程式人生 > >zookeeper 事件監聽機制

zookeeper 事件監聽機制

1、概述

上一篇文章,我們對zookeeper中的資料組織結構、Leader選舉原理進行了講述(http://blog.csdn.net/yinwenjie/article/details/47613309)。這篇文章我們緊接上文講解zookeeper中的事件機制。並通過示例程式碼告訴讀者怎麼使用zookeeper中的事件通知器:watcher。

2、zookeeper中的監聽機制

按照上文中的講解,我們知道zookeeper主要是為了統一分散式系統中各個節點的工作狀態,在資源衝突的情況下協調提供節點資源搶佔,提供給每個節點了解整個叢集所處狀態的途徑。這一切的實現都依賴於zookeeper中的事件監聽和通知機制

2.1、zookeeper中的事件和狀態

事件和狀態構成了zookeeper客戶端連線描述的兩個維度。注意,網上很多帖子都是在介紹zookeeper客戶端連線的事件,但是忽略了zookeeper客戶端狀態的變化也是要進行監聽和通知的。這裡我們通過下面的兩個表詳細介紹zookeeper中的事件和狀態(zookeeper API中被定義為@Deprecated的事件和狀態就不介紹了):

  • zookeeper客戶端與zookeeper server連線的狀態
連線狀態 狀態含義
KeeperState.Expired 客戶端和伺服器在ticktime的時間週期內,是要傳送心跳通知的。這是租約協議的一個實現。客戶端傳送request,告訴伺服器其上一個租約時間,伺服器收到這個請求後,告訴客戶端其下一個租約時間是哪個時間點。當客戶端時間戳達到最後一個租約時間,而沒有收到伺服器發來的任何新租約時間,即認為自己下線(此後客戶端會廢棄這次連線,並試圖重新建立連線)。這個過期狀態就是Expired狀態
KeeperState.Disconnected 就像上面那個狀態所述,當客戶端斷開一個連線(可能是租約期滿,也可能是客戶端主動斷開)這是客戶端和伺服器的連線就是Disconnected狀態
KeeperState.SyncConnected 一旦客戶端和伺服器的某一個節點建立連線(注意,雖然叢集有多個節點,但是客戶端一次連線到一個節點就行了),並完成一次version、zxid的同步,這時的客戶端和伺服器的連線狀態就是SyncConnected
KeeperState.AuthFailed zookeeper客戶端進行連線認證失敗時,發生該狀態

需要說明的是,這些狀態在觸發時,所記錄的事件型別都是:EventType.None

  • zookeeper中的事件。當zookeeper客戶端監聽某個znode節點”/node-x”時:
zookeeper事件 事件含義
EventType.NodeCreated 當node-x這個節點被建立時,該事件被觸發
EventType.NodeChildrenChanged 當node-x這個節點的直接子節點被建立、被刪除、子節點資料發生變更時,該事件被觸發。
EventType.NodeDataChanged 當node-x這個節點的資料發生變更時,該事件被觸發
EventType.NodeDeleted 當node-x這個節點被刪除時,該事件被觸發。
EventType.None 當zookeeper客戶端的連線狀態發生變更時,即KeeperState.Expired、KeeperState.Disconnected、KeeperState.SyncConnected、KeeperState.AuthFailed狀態切換時,描述的事件型別為EventType.None

2.2、獲取相應的響應

我們詳細描述了zookeeper客戶端連線的狀態和zookeeper對znode節點監聽的事件型別,下面我們來講解如何建立zookeeper的watcher監聽。在zookeeper中,並沒有傳統的add****Listener這樣的註冊監聽器的方法。而是採用zk.getChildren(path, watch)、zk.exists(path, watch)、zk.getData(path, watcher, stat)這樣的方式為某個znode註冊監聽。也可以通過zk.register(watcher)註冊預設監聽。

  • 無論哪一種註冊監聽的方式,都可以對EventType.None事件進行監聽,如果有多個監聽器,這些監聽器都會收到EventType.None事件。(後文實驗)

下表以node-x節點為例,說明呼叫的註冊方法和可監聽事件間的關係:

註冊方式 NodeCreated NodeChildrenChanged NodeDataChanged EventType.NodeDeleted
zk.getChildren(“/node-x”,watcher) 可監控 可監控
zk.exists(“/node-x”,watcher) 可監控 可監控 可監控
zk.getData(“/node-x”,watcher) 悖論 可監控 可監控

網上很多文章都會引用官方的一個事件表格,這裡我就不再引用了,請自行百度吧(反正我覺得80%是抄的,並沒有把事件對應的監聽關係說清楚),還不如看我這個

2.3、watcher機制

zookeeper中的watcher機制很特別,請注意以下一些關鍵的經驗提醒(這些經驗提醒在其他地方找不到):

  • 一個節點可以註冊多個watcher,但是分成兩種情況,當一個watcher例項多次註冊時,zkClient也只會通知一次;當多個不同的watcher例項都註冊時,zkClient會依次進行通知(並不是很多網貼粗略說的“多次註冊一次通知”),後文將會有實驗。

  • 監控同一個節點X的一個watcher例項,通過exist、getData等註冊方式多次註冊的,zkClient也只會通知一次。這個原理在很多網貼上也都有說明,後文我們同樣進行實驗。

  • 注意,很多網貼都說zk.getData(“/node-x”,watcher)這種註冊方式可以監控節點的NodeCreated事件,實際上是不行的(或者說沒有意義)。當一個節點還不存在時,zk.getData這樣設定的watcher是會丟擲KeeperException$NoNodeException異常的,這次註冊會失敗,watcher也不會起作用;一旦node-x節點存在了,那這個節點的NodeCreated事件又有什麼意義呢?(後文做實驗)

  • zookeeper中並沒有“永久監聽”這種機制。網上所謂實現了”永久監聽”的帖子,只是一種程式設計技巧。思路可以歸為兩類:一種是“在保證所有節點的watcher都被重新註冊”的前提下,再進行目錄、子目錄的更改;另外一種是“在監聽被觸發後,被重新註冊前,重新取一次節點的資訊”確保在“監聽真空期”znode沒有變化。 有興趣的讀者可自行百度。

下圖可以反映zookeeper-watcher的監聽真空期:

這裡寫圖片描述

我本人對真空期的處理,更傾向於,註冊監聽後主動檢查本次節點的znode-version和上次節點的znode-version是否一致,來確定是否真空期有節點變化。

3、程式碼示例

3.1、驗證監聽

3.1.1、驗證對一個znode多次註冊watcher

為了簡單起見,我們先檢驗一個最好檢驗的東西,就是為一個znode註冊多個watcher時,watcher的通知機制到底是什麼樣的。這樣依賴,第一次接觸zookeeper的讀者也可以根據程式碼,快速上手。我們依據前文建立的zookeeper叢集,啟動了zookeeper的三個工作節點,並註冊watcher(我們只會使用其中的一個):

然後我們加測,使用getDate方法是否能夠檢測一個不存在的節點“Y”的建立事件。

package com.yinwenjie.test.zookeepertest.test;

import java.io.FileNotFoundException;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.util.Log4jConfigurer;

import com.yinwenjie.test.zookeepertest.TestZookeeperAgainst;

/**
 * 這個測試類測試多個watcher監控某一個znode節點的效果。<br>
 * servers:192.168.61.129:2181,192.168.61.130:2181,192.168.61.132:2181<br>
 * 為了驗證zk叢集的工作效果,我們選擇一個節點進行連線(192.168.61.129)。
 * @author yinwenjie
 */
public class TestManyWatcher implements Runnable {
    static {
        try {
            Log4jConfigurer.initLogging("classpath:log4j.properties");
        } catch (FileNotFoundException ex) {
            System.err.println("Cannot Initialize log4j");
            System.exit(-1);
        }
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(TestZookeeperAgainst.class);

    public static void main(String[] args) throws Exception {
        TestManyWatcher testManyWatcher = new TestManyWatcher();
        new Thread(testManyWatcher).start();
    }

    public void run() {
        /*
         * 驗證過程如下:
         * 1、驗證一個節點X上使用exist方式註冊的多個監聽器(ManyWatcherOne、ManyWatcherTwo),
         *      在節點X發生create事件時的事件通知情況
         * 2、驗證一個節點Y上使用getDate方式註冊的多個監聽器(ManyWatcherOne、ManyWatcherTwo),
         *      在節點X發生create事件時的事件通知情況
         * */
        //預設監聽:註冊預設監聽是為了讓None事件都由預設監聽處理,
        //不干擾ManyWatcherOne、ManyWatcherTwo的日誌輸出
        ManyWatcherDefault watcherDefault = new ManyWatcherDefault();
        ZooKeeper zkClient = null;
        try {
            zkClient = new ZooKeeper("192.168.61.129:2181", 120000, watcherDefault);
        } catch (IOException e) {
            TestManyWatcher.LOGGER.error(e.getMessage(), e);
            return;
        }
        //預設監聽也可以使用register方法註冊
        //zkClient.register(watcherDefault);

        //1、========================================================
        TestManyWatcher.LOGGER.info("=================以下是第一個實驗");
        String path = "/X";
        ManyWatcherOne watcherOneX = new ManyWatcherOne(zkClient , path);
        ManyWatcherTwo watcherTwoX = new ManyWatcherTwo(zkClient , path);
        //註冊監聽,注意,這裡兩次exists方法的執行返回都是null,因為“X”節點還不存在
        try {
            zkClient.exists(path, watcherOneX);
            zkClient.exists(path, watcherTwoX);
            //建立"X"節點,為了簡單起見,我們忽略許可權問題。
            //並且建立一個臨時節點,這樣重複跑程式碼的時候,不用去server上手動刪除)
            zkClient.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            TestManyWatcher.LOGGER.error(e.getMessage(), e);
            return;
        }
        //TODO 注意觀察日誌,根據原理我們猜測理想情況下ManyWatcherTwo和ManyWatcherOne都會被通知。

        //2、========================================================
        TestManyWatcher.LOGGER.info("=================以下是第二個實驗");
        path = "/Y";
        ManyWatcherOne watcherOneY = new ManyWatcherOne(zkClient , path);
        ManyWatcherTwo watcherTwoY = new ManyWatcherTwo(zkClient , path);
        //註冊監聽,注意,這裡使用兩次getData方法註冊監聽,"Y"節點目前並不存在
        try {
            zkClient.getData(path, watcherOneY, null);
            zkClient.getData(path, watcherTwoY, null);
        } catch (Exception e) {
            TestManyWatcher.LOGGER.error(e.getMessage(), e);
        }
        //TODO 注意觀察日誌,因為"Y"節點不存在,所以getData就會出現異常。watcherOneY、watcherTwoY的註冊都不起任何作用。
        //然後我們在報了異常的情況下,建立"Y"節點,根據原理,不會有任何watcher響應"Y"節點的create事件
        try {
            zkClient.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            TestManyWatcher.LOGGER.error(e.getMessage(), e);
            return;
        }

        //下面這段程式碼可以忽略,是為了觀察zk的原理。讓守護執行緒保持不退出
        synchronized(this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                TestManyWatcher.LOGGER.error(e.getMessage(), e);
                System.exit(-1);
            }
        }
    }
}

/**
 * 這是預設的watcher實現。
 * @author yinwenjie
 */
class ManyWatcherDefault implements Watcher {
    /**
     * 日誌
     */
    private static Log LOGGER = LogFactory.getLog(ManyWatcherDefault.class);

    public void process(WatchedEvent event) {
        KeeperState keeperState = event.getState();
        EventType eventType = event.getType();
        ManyWatcherDefault.LOGGER.info("=========預設監聽到None事件:keeperState = " 
                + keeperState + "  :  eventType = " + eventType);
    }
}

/**
 * 這是第一種watcher
 * @author yinwenjie
 */
class ManyWatcherOne implements Watcher {
    /**
     * 日誌
     */
    private static Log LOGGER = LogFactory.getLog(ManyWatcherOne.class);

    private ZooKeeper zkClient;

    /**
     * 被監控的znode地址
     */
    private String watcherPath;

    public ManyWatcherOne(ZooKeeper zkClient , String watcherPath) {
        this.zkClient = zkClient;
        this.watcherPath = watcherPath;
    }

    public void process(WatchedEvent event) {
        try {
            this.zkClient.exists(this.watcherPath, this);
        } catch (Exception e) {
            ManyWatcherOne.LOGGER.error(e.getMessage(), e);
        }
        KeeperState keeperState = event.getState();
        EventType eventType = event.getType();
        //這個屬性是發生事件的path
        String eventPath = event.getPath();

        ManyWatcherOne.LOGGER.info("=========ManyWatcherOne監聽到" + eventPath + "地址發生事件:"
                + "keeperState = " + keeperState + "  :  eventType = " + eventType);
    }
}

/**
 * 這是第二種watcher
 * @author yinwenjie
 */
class ManyWatcherTwo implements Watcher {
    /**
     * 日誌
     */
    private static Log LOGGER = LogFactory.getLog(ManyWatcherOne.class);

    private ZooKeeper zkClient;

    /**
     * 被監控的znode地址
     */
    private String watcherPath;

    public ManyWatcherTwo(ZooKeeper zkClient, String watcherPath) {
        this.zkClient = zkClient;
        this.watcherPath = watcherPath;
    }

    public void process(WatchedEvent event) {
        try {
            this.zkClient.exists(this.watcherPath, this);
        } catch (Exception e) {
            ManyWatcherTwo.LOGGER.error(e.getMessage(), e);
        }
        KeeperState keeperState = event.getState();
        EventType eventType = event.getType();
        //這個屬性是發生事件的path
        String eventPath = event.getPath();

        ManyWatcherTwo.LOGGER.info("=========ManyWatcherTwo監聽到" + eventPath + "地址發生事件:"
                + "keeperState = " + keeperState + "  :  eventType = " + eventType);
    }
}
  • 程式碼中的註釋自我感覺寫得比較詳細,這裡就不再介紹了。以下是執行這段測試程式碼後,所執行的Log4j的日誌資訊。
[2015-08-18 19:27:37] INFO  Thread-0 Initiating client connection, connectString=192.168.61.129:2181 sessionTimeout=120000 w[email protected]6db38815 (ZooKeeper.java:379)
  [2015-08-18 19:27:37] INFO  Thread-0 =================以下是第一個實驗 (TestManyWatcher.java:67)
  [2015-08-18 19:27:37] INFO  Thread-0-SendThread() Opening socket connection to server /192.168.61.129:2181 (ClientCnxn.java:1061)
  [2015-08-18 19:27:37] INFO  Thread-0-SendThread(192.168.61.129:2181) Socket connection established to 192.168.61.129/192.168.61.129:2181, initiating session (ClientCnxn.java:950)
  [2015-08-18 19:27:37] INFO  Thread-0-SendThread(192.168.61.129:2181) Session establishment complete on server 192.168.61.129/192.168.61.129:2181, sessionid = 0x14f40902e2a0000, negotiated timeout = 40000 (ClientCnxn.java:739)
  [2015-08-18 19:27:37] INFO  Thread-0-EventThread =========預設監聽到None事件:keeperState = SyncConnected  :  eventType = None (TestManyWatcher.java:130)
  [2015-08-18 19:27:37] INFO  Thread-0 =================以下是第二個實驗 (TestManyWatcher.java:85)
  [2015-08-18 19:27:37] INFO  Thread-0-EventThread =========ManyWatcherTwo監聽到/X地址發生事件:keeperState = SyncConnected  :  eventType = NodeCreated (TestManyWatcher.java:206)
  [2015-08-18 19:27:37] INFO  Thread-0-EventThread =========ManyWatcherOne監聽到/X地址發生事件:keeperState = SyncConnected  :  eventType = NodeCreated (TestManyWatcher.java:168)
  [2015-08-18 19:27:37] ERROR Thread-0 KeeperErrorCode = NoNode for /Y (TestManyWatcher.java:94)
  org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /Y
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
    at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
    at com.yinwenjie.test.zookeepertest.test.TestManyWatcher.run(TestManyWatcher.java:91)
    at java.lang.Thread.run(Unknown Source)

在我們呼叫getData,丟擲異常後,我們試圖建立“Y”節點。並且發現沒有任何監聽日誌輸出。於是我們肯定了上文中的兩個描述:

  • 針對一個節點發生的事件,zkClient是不是做多次watcher通知,和使用什麼方法註冊的沒有關係,關鍵在於所註冊的watcher例項是否為同一個例項。

  • 使用getDate註冊一個不存在的節點的監聽,並試圖監聽這個節點create event是無法實現的。因為會丟擲NoNodeException異常,註冊watcher的動作也會變得無效。

3.1.2、驗證default watcher監聽EventType.None事件

這個測試在指定了預設watcher監聽和沒有指定預設watcher監聽的兩種情況下。zkClient對Event-NONE事件的響應機制。

package com.yinwenjie.test.zookeepertest.test;

import java.io.FileNotFoundException;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.springframework.util.Log4jConfigurer;

/**
 * 這個測試類測試在指定了預設watcher,並且有不止一個watcher例項的情況下。zkClient對Event-NONE事件的響應機制。
 * servers:192.168.61.129:2181,192.168.61.130:2181,192.168.61.132:2181<br>
 * 我們選擇一個節點進行連線(192.168.61.129),這樣好在主動停止這個zk節點後,觀察watcher的響應情況。
 * @author yinwenjie
 */
public class TestEventNoneWatcher implements Runnable {

    static {
        try {
            Log4jConfigurer.initLogging("classpath:log4j.properties");
        } catch (FileNotFoundException ex) {
            System.err.println("Cannot Initialize log4j");
            System.exit(-1);
        }
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(TestEventNoneWatcher.class);

    private ZooKeeper zkClient = null;

    public static void main(String[] args) throws Exception {
        TestEventNoneWatcher testEventNoneWatcher = new TestEventNoneWatcher();
        new Thread(testEventNoneWatcher).start();
    }

    public void run() {
        /*
         * 驗證過程如下:
         * 1、連線zk後,並不進行進行預設的watcher的註冊,並且使用exist方法註冊一個監聽節點"X"的監聽器。
         *      (完成後主執行緒進入等待狀態)
         * 2、關閉192.168.61.129:2181這個zk節點,讓Disconnected事件發生。
         *      觀察到底是哪個watcher響應這些None事件。
         * */
        //1、========================================================
        //註冊預設監聽
        EventNodeWatcherDefault watcherDefault = new EventNodeWatcherDefault(this);
        try {
            this.zkClient = new ZooKeeper("192.168.61.129:2181", 120000, watcherDefault);
        } catch (IOException e) {
            TestEventNoneWatcher.LOGGER.error(e.getMessage(), e);
            return;
        }

        String path = "/X";
        EventNodeWatcherOne eventNodeWatcherOne = new EventNodeWatcherOne(this.zkClient , path);
        //註冊監聽,注意,這裡兩次exists方法的執行返回都是null,因為“X”節點還不存在
        try {
            zkClient.exists(path, eventNodeWatcherOne);
            //建立"X"節點,為了簡單起見,我們忽略許可權問題。
            //並且建立一個臨時節點,這樣重複跑程式碼的時候,不用去server上手動刪除)
            zkClient.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            TestEventNoneWatcher.LOGGER.error(e.getMessage(), e);
            return;
        }

        //完成註冊後,主執行緒等待。然後關閉192.168.61.129上的zk節點
        synchronized(this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                TestEventNoneWatcher.LOGGER.error(e.getMessage(), e);
                System.exit(-1);
            }
        }
    }

    public ZooKeeper getZkClient() {
        return zkClient;
    }
}

/**
 * 這是預設的watcher實現。
 * @author yinwenjie
 */
class EventNodeWatcherDefault implements Watcher {
    /**
     * 日誌
     */
    private static Log LOGGER = LogFactory.getLog(EventNodeWatcherDefault.class);

    private TestEventNoneWatcher eventNoneWatcherThead;

    public EventNodeWatcherDefault(TestEventNoneWatcher eventNoneWatcherThead) {
        this.eventNoneWatcherThead = eventNoneWatcherThead;
    }

    public void process(WatchedEvent event) {
        //重新註冊監聽
        this.eventNoneWatcherThead.getZkClient().register(this);

        KeeperState keeperState = event.getState();
        EventType eventType = event.getType();
        EventNodeWatcherDefault.LOGGER.info("=========預設EventNodeWatcher監聽到None事件:keeperState = " 
                + keeperState + "  :  eventType = " + eventType);
    }
}

/**
 * 這是第一種watcher
 * @author yinwenjie
 */
class EventNodeWatcherOne implements Watcher {
    /**
     * 日誌
     */
    private static Log LOGGER = LogFactory.getLog(EventNodeWatcherOne.class);

    private ZooKeeper zkClient;

    /**
     * 被監控的znode地址
     */
    private String watcherPath;

    public EventNodeWatcherOne(ZooKeeper zkClient , String watcherPath) {
        this.zkClient = zkClient;
        this.watcherPath = watcherPath;
    }

    public void process(WatchedEvent event) {
        try {
            this.zkClient.exists(this.watcherPath, this);
        } catch (Exception e) {
            EventNodeWatcherOne.LOGGER.error(e.getMessage(), e);
        }
        KeeperState keeperState = event.getState();
        EventType eventType = event.getType();

        EventNodeWatcherOne.LOGGER.info("=========EventNodeWatcherOne監聽到事件:keeperState = " 
                + keeperState + "  :  eventType = " + eventType);
    }
}
  • 我們來執行這段程式碼。列印的Log4j的資訊如下:

這裡寫圖片描述

從log4j的日誌可以看到,預設的watcher,監聽到了zkClient的Event-None事件。而節點”X”的建立事件由EventNodeWatcherOne的例項進行了監聽。接下來測試程式碼進入了等待狀態。

然後我們關閉這個zkServer節點,並且觀察watcher的響應情況:

[[email protected] ~]# zkServer.sh stop
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
  • 以下是zk客戶端響應的log4j日誌資訊:

這裡寫圖片描述

紅圈處,我們看到zkServer的節點斷開後,EventType.None事件被已經註冊的兩個watcher分別響應了一次。這裡注意兩個異常:第一個異常是斷開連線後,socket報的錯誤,從錯誤中我們可以看到zookeeper客戶端的連線使用了sun的nio框架(如果您不知道nio請自行百度,或者關注我後續的博文);第二個錯,是在斷開連線後,EventNodeWatcherOne試圖重新使用exists方式註冊監聽,所以報錯。

可見EventType.None事件,會由所有的監聽器全部響應。所以這裡我們的程式設計建議是:一定要使用預設監聽器,並由預設監聽器來響應EventType.None事件;其他針對znode節點的接聽器,只針對節點事件進行處理,使用if語句進行過濾,如果發現是EventType.None事件,則忽略不作處理。

當然,任何編碼過程都是要根據您自己的業務規則來設計。以上的建議只是筆者針對一般業務情況的處理方式。

3.2、協調獨享資源的搶佔

下面的程式碼,用來向各位看官演示,zookeeper利用其znode機制,是怎麼完成資源搶佔的協調過程的。為了簡化程式碼片段,我沒有使用預設的watcher監聽,所以啟動的時候會報一個空指標錯誤是正常的,原因在org.apache.zookeeper.ClientCnxn的524行(3.4.5版本):

private void processEvent(Object event) {
    try {
     if (event instanceof WatcherSetEventPair) {
         // each watcher will process the event
         WatcherSetEventPair pair = (WatcherSetEventPair) event;
         for (Watcher watcher : pair.watchers) {
             try {
                 watcher.process(pair.event);
             } catch (Throwable t) {
                 LOG.error("Error while calling watcher ", t);
             }
         }
     } else {
         。。。。

通過這段程式碼,讀者還可以跟蹤出各種事件的響應方式。但這個不是本文中擴散討論的了,在我後續hadoop系列文章中,我還會介紹zookeeper,那時我們會深入討論zookeeper客戶端重連過程、更深層次的watcher事件機制。

話鋒轉回,下面的程式碼是如何使用zookeeper進行獨享資源的協調,同樣的,程式碼中註釋寫得比較清楚,就不在進行文字敘述了(程式碼是可以執行的,但是不建議拷貝貼上哈^_^):

package com.yinwenjie.test.zookeepertest;

import java.io.FileNotFoundException;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.springframework.util.Log4jConfigurer;

public class TestZookeeperAgainst implements Runnable {
    static {
        try {
            Log4jConfigurer.initLogging("classpath:log4j.properties");
        } catch (FileNotFoundException ex) {
            System.err.println("Cannot Initialize log4j");
            System.exit(-1);
        }
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(TestZookeeperAgainst.class);

    private ZooKeeper zk;

    /**
     * 代表“我”建立的子節點
     */
    private String myChildNodeName;

    public static void main(String[] args) throws Exception {
        TestZookeeperAgainst testZookeeperAgainst = new TestZookeeperAgainst();
        new Thread(testZookeeperAgainst).start();
    }

    public TestZookeeperAgainst() throws Exception {
        this.zk = new ZooKeeper("192.168.61.129:2181,192.168.61.130:2181,192.168.61.132:2181", 7200000, new DefaultWatcher());

        //建立一個父級節點filesq(如果沒有的話)
        Stat pathStat = null;
        try {
            pathStat = this.zk.exists("/filesq", null);
            //2.2如果條件成立,說明節點不存在(只需要判斷一個節點的存在性即可)
            if(pathStat == null) {
                this.zk.create("/filesq", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch(Exception e) {
            TestZookeeperAgainst.LOGGER.error(e.getMessage(), e);
            System.exit(-1);
        }
    }

    public void run() {
        /*
         * 當這個執行緒活動時,我們做以下幾個事情:
         * 1、首先註冊/filesq,檢控/filesq下子節點的變化
         * 2、向/filesq中註冊一個臨時的,帶有唯一編號的子節點,
         * 3、然後等待,直到AgainstWatcher發現已經輪到“自己”執行,並喚醒
         * 4、喚醒後,開始執行具體的業務。
         * 5、執行完成後,主動刪除這個子節點, 或者剝離與zk的連線(推薦前者,但怎麼操作,還是根據業務來)
         * */

        //1、==============
        String childPath = "/filesq/childnode";
        AgainstWatcher againstWatcher = new AgainstWatcher(this);
        try {
            //建立監聽
            this.zk.getChildren("/filesq", againstWatcher);
            //2、==============
            this.myChildNodeName = this.zk.create(childPath, "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception e) {
            TestZookeeperAgainst.LOGGER.error(e.getMessage(), e);
            System.exit(-1);
        }
        TestZookeeperAgainst.LOGGER.info("被建立的子節點是:" + this.myChildNodeName);

        //3、==============
        synchronized(this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                TestZookeeperAgainst.LOGGER.error(e.getMessage(), e);
                System.exit(-1);
            }
        }

        //4、==============
        TestZookeeperAgainst.LOGGER.info("喚醒後,開始執行具體的業務=========");

        //5、==============
        try {
            this.zk.delete(this.myChildNodeName, -1);
        } catch (Exception e) {
            TestZookeeperAgainst.LOGGER.error(e.getMessage(), e);
            System.exit(-1);
        }
        //this.zk.close();

        //下面這段程式碼完全可以在正式使用時忽略,完全是為了觀察zk的原理
        synchronized(this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                TestZookeeperAgainst.LOGGER.error(e.getMessage(), e);
                System.exit(-1);
            }
        }
    }

    public String getMyChildNodeName() {
        return myChildNodeName;
    }

    public ZooKeeper getZk() {
        return zk;
    }
}

/**
 * 這個watcher專門用來監聽子級節點的變化
 * @author yinwenjie
 */
class AgainstWatcher implements Watcher {

    private static final Log LOGGER = LogFactory.getLog(AgainstWatcher.class);

    private TestZookeeperAgainst parentThread;

    public AgainstWatcher(TestZookeeperAgainst parentThread) {
        this.parentThread = parentThread;
    }

    public void process(WatchedEvent event) {
        /*
         * 作為znode觀察者,需要做以下事情:
         * 1、當收到一個監聽事件後,要馬上重新註冊zk,以便保證下次事件監聽能被接受
         * 2、比較當前/filesq下最小的一個節點A,如果這個節點A不是自己建立的,
         *      說明還不到自己執行,忽略後續操作
         * 3、如果A是自己建立的,則說明輪到自己佔有這個資源了,喚醒parentThread進行業務處理
         * */
        //1、=========================
        ZooKeeper zk = this.parentThread.getZk();
        List<String> childPaths = null;
        try {
            childPaths = zk.getChildren("/filesq", this);
        } catch (Exception e) {
            AgainstWatcher.LOGGER.error(e.getMessage(), e);
            System.exit(-1);
        }
        if(event.getType() != EventType.NodeChildrenChanged || childPaths == null || childPaths.size() == 0) {
            return;
        }

        //2、=========================
        String nowPath = "/filesq/" + childPaths.get(0);
        String myChildNodeName = this.parentThread.getMyChildNodeName();
        if(!StringUtils.equals(nowPath, myChildNodeName)) {
            return;
        }

        //3、=========================
        synchronized(this.parentThread) {
            this.parentThread.notify();
        }
    }
}
  • 4、後文介紹

和實驗室各位大神討論zookeeper的時間是8月13號,借這個機會我在Blog上面預釋出了hadoop系列的zookeeper文章3篇。相信各位讀者對zookeeper已經有一個大致瞭解,並能夠將其運用到自己的工作中了(還是那句話,網上的文章不能全信,實踐才是檢驗真理的唯一標準)。

這篇文章後,我會將寫作重點移回 架構設計:負載均衡層設計方案 系列文章,畢竟還差一篇 LVS + Keepalived + Nginx的整合文章,負載均衡層的介紹就可以告一段落了。在後面就是業務層的介紹了,我將重點介紹兩種訊息佇列和兩套SOA實現方案(我想,什麼Tomcat的優化、Spring的使用等等基礎知識,就不需要寫了吧。。),再次感謝各位對我部落格的關注。

最後推薦幾篇阿里團隊關於zookeeper的文章:

相關推薦

zookeeper 事件機制

1、概述 上一篇文章,我們對zookeeper中的資料組織結構、Leader選舉原理進行了講述(http://blog.csdn.net/yinwenjie/article/details/47613309)。這篇文章我們緊接上文講解zookeeper中的事件機制。並通過示

java中的key事件機制

com java.awt imp package 時間 ext javax .get pri package com.at221; import java.awt.event.KeyAdapter; import java.awt.event.KeyEvent; im

Java 中的事件機制

add import userdata 開發人員 util ner dns pre sta 看項目代碼時遇到了好多事件監聽機制相關的代碼。現學習一下: java事件機制包含三個部分:事件、事件監聽器、事件源。 1.事件:繼承自java.util.EventO

Java中的事件機制

void 初始化 release vax 輸入 logs p s get location 鼠標事件監聽機制的三個方面: 1.事件源對象:   事件源對象就是能夠產生動作的對象。在Java語言中所有的容器組件和元素組件都是事件監聽中的事件源對象。Java中根據事件的動作來區

JAVA 圖形開發之計算器設計(事件機制

oncommand image 事件監聽 str one 創建 dac orm mat /*文章中用到的代碼只是一部分,需要源碼的可通過郵箱聯系我 [email protected]*/ 前段時間剛幫同學用MFC寫了個計算器,現在學到JAVA的圖形開發,就試著水了一個計算器

Java事件機制與觀察者設計模式

idea demo1 script 操作 alt face ner over 方法 一. Java事件監聽機制 1. 事件監聽三要素: 事件源,事件對象,事件監聽器 2. 三要素之間的關系:事件源註冊事件監聽器後,當事件源上發生某個動作時,事件源就會調用事件監聽的一個方法,

spring事件機制

ide could 事情 task object 負責 his try and 事件機制的主要成員:   事件 事件監聽器(監聽事件觸發,處理一些事情) 事件源(發布事件) javaSE 提供了一系列自定義事件的標準。 EvenObject,為javaSE提供

UGUI的事件機制

UGUI控制元件的事件響應有很多種方式,比如使用元件EventTrigger來新增事件監聽,或者實現IDragHandler等介面,或者更直接地繼承EventTrigger來進行更靈話的呼叫。 下面分別就上面3種監聽方式進行講解,這些例子都是實現拖動Image的功能。 一、使用元件Event

自定義JAVA事件機制

JAVA中的事件機制的參與者有3種角色:Event、EventSource、Listener 1.Event:就是事件產生時具體的“事件”,用於Listener的相應的方法之中,作為引數一般存在於Listener的方法之中。 2.EventSource:事件源,它的作用主要是對事件和監聽進行

ZooKeeper Watcher機制(資料變更的通知)(二)(分析)

緊接著上一篇部落格:https://blog.csdn.net/Dongguabai/article/details/82970852 在輸出內容中有這樣兩個結果: 在ZooKeeper中,介面類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含Ke

Java的GUI學習四(事件機制)

事件監聽機制的特點:   1.事件源2.事件3.監聽器4.事件處理 比如密碼鎖設為(事件源) 將報警裝置裝的密碼鎖上。。。(時刻都有監聽器)  監聽器要註冊到事件源上面 如果有錘子砸(外部動作,如果是摸的話就沒有關係)密碼鎖。(事件產生了,封裝成物件,

Zookeeper機制api與原理

1.連線Zookeeper,註冊監聽 ZooKeeper zkCli = new ZooKeeper("192.168.50.183:2181,192.168.50.184:2181,192.168.50.185:2181", 3000, new Watcher() { //

JavaScript基礎(六)事件機制

事件監聽機制: * 概念:某些元件被執行了某些操作後,觸發某些程式碼的執行。 * 事件:某些操作。如: 單擊,雙擊,鍵盤按下了,滑鼠移動了 * 事件源:元件。如: 按鈕 文字輸入框... * 監聽器:程式碼。 * 註冊監聽:將事件,事件源,監聽器結合在

深入理解Spring的事件機制

目錄 1. 什麼是事件監聽機制 在講解事件監聽機制前,我們先回顧下設計模式中的觀察者模式,因為事件監聽機制可以說是在典型觀察者模式基礎上的進一步抽象和改進。我們可以在JDK或者各種開源框架比如Spring中看到它的身影,從這個意義上說,事件監聽機制也可以看做一種對傳統觀察者模式的具體實現,不同的框架對其實現

33 事件機制測試

1 自定義ApplicationContextInitializer package com.gp6.springboot27.initializer; import org.springframework.context.ApplicationContextInitializer

Spring中的事件機制在專案中的應用

最經在做專案的時候,呼叫某個介面的時候為了呼叫地圖,而不希望因為呼叫超時影響到主執行緒,使用了spring的時間監聽機制。 Spring中提供一些Aware相關的介面,BeanFactoryAware、 ApplicationContextAware、Reso

Zookeeper 事件 - 史上最詳解讀

目錄 寫在前面 1.1. Curator 事件監聽 1.1.1. Watcher 標準的事件處理器 1.1.2. NodeCache 節點快取的監聽 1.1.3. PathChildrenCache 子節點監聽 1.1.4. Tree Cache 節點樹快取 寫在最後 瘋狂

java事件機制,spring中的event listener模式和解耦

event,listener是observer模式一種體現,在spring 3.0.5中,已經可以使用annotation實現event和eventListner裡。我們以spring-webflow裡的hotel booking為例,看一下實現,步驟如下:1,建立eve

java事件機制(觀察者設計模式的實際運用)

package cn.yang.test.controller; /**java的事件監聽機制和觀察者設計模式 * Created by Dev_yang on 2016/3/1. */ publ

25 API-GUI(事件機制,介面卡模式),Netbeans的概述和使用(模擬登陸註冊GUI版)

 1:GUI(瞭解) (1)使用者圖形介面GUI:方便直觀CLI:需要記憶一下命令,麻煩(2)兩個包:java.awt:和系統關聯較強  ,屬重量級控制元件javax.swing:純Java編寫,增強了移植性,屬輕量級控制元件。(3)GUI的繼承體系元件:元件就是物件容器元