1. 程式人生 > >Apache curator-recipes程式碼範例

Apache curator-recipes程式碼範例

Apache curator-recipes程式碼例項

? ? Apache curator-recipes元件提供了大量已經"生產化"(produced)的特性,極大的簡化了使用zk的複雜度.

? ? 1. Cache: 提供了對一個Node持續監聽,如果節點資料變更,即可立即得到響應. 開發者無需過度的關注watcher和Event操作.

? ? 2. Queues: 提供了重量級的分散式佇列解決方法,比如:權重佇列,可延遲佇列等.其實Zookeeper並不適合作為資料儲存系統,你可以適度的使用它來達成分散式佇列的設計要求.

? ? 3. Counters: 全域性計數器是分散式設計中很常用的,包括"全域性計算器"和"原子自增計數器".

? ? 4. Locks: 分散式鎖的設計有很多手段,此元件提供了分散式"讀寫分寫鎖"/"共享鎖"等.在我們需要控制資源訪問的情況下,非常有用.

? ? 5. Barries: 柵欄,需要對分散式環境中,多個操作(程序)進行同步或者協同時,可以考慮使用barries;

? ? 6. Elections: 選舉;可以在多個"註冊者"之間選舉出leader,作為操作排程/任務監控/佇列消費的執行者,我們在設計"leader角色選舉"/"單點任務執行""分散式佇列消費者"等場景時,非常有效.

?

? ? 程式碼例項

1. 建立Client

CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
//fluent style
String namespace = "cluster-worker";
CuratorFramework client = builder.connectString("127.0.0.1:2181")
		.sessionTimeoutMs(30000)
		.connectionTimeoutMs(30000)
		.canBeReadOnly(false)
		.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
		.namespace(namespace)
		.defaultData(null)
		.build();
client.start();
EnsurePath ensure = client.newNamespaceAwareEnsurePath(namespace);
//code for test
Thread.sleep(5000);
//client.close()//

?

2. Caches

? ? 持續watcher節點,並將節點的資料變更即時的在本地反應出來.recpise提供了PathChildrenCache和NodeCache兩個API.

public static PathChildrenCache pathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
	final PathChildrenCache cached = new PathChildrenCache(client, path, cacheData);
	cached.getListenable().addListener(new PathChildrenCacheListener() {
		@Override
		public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
			PathChildrenCacheEvent.Type eventType = event.getType();
			switch (eventType) {
				case CONNECTION_RECONNECTED:
					cached.rebuild();
					break;
				case CONNECTION_SUSPENDED:
				case CONNECTION_LOST:
					System.out.println("Connection error,waiting...");
					break;
				default:
					System.out.println("Data:" + event.getData().toString());
			}
		}
	});
	return cached;
}

?

PathChildrenCache cached = pathChildrenCache(client,path,true);
//= start() + rebuild()
//事件操作,將會在額外的執行緒中執行.
cached.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
List<ChildData> childData = cached.getCurrentData();
if (childData != null) {
	for (ChildData data : childData) {
		System.out.println("Path:" + data.getPath() + ",data" + new String(data.getData(), "utf-8"));
	}
}

//當不在需要關注此節點資料時,需要及時的關閉它.
//因為每個cached,都會額外的消耗一個執行緒.
//cached.close();////close the watcher,clear the cached Data

? ? 對於PathChildrenCache.getCurrentData()將從獲取本地的資料列表,而不是觸發一次zookeeper.getChildren(),因此為"Cache".

3. Queues:分散式佇列

? ? 分散式佇列的基本特性,就是"生產者"或"消費者"跨越多個程序,且在此種環境中需要確保佇列的push/poll的有序性.

? ? zookeeper本身並沒有提供分散式佇列的實現,只是recipse根據zookeeper的watcher和具有version標記的node,來間接的實現分散式queue..內部機制如下:

? ? --> 如果是消費者(QueueConsumer),會建立一個類似於PathChildrenCache的例項用於監聽queuePath下的子節點變更事件(單獨的執行緒中).同時consumer處於阻塞狀態,當有子節點變更事件時會被喚醒(包括建立子節點/刪除子節點等);

? ? --> 此時consumer獲取子節點列表,並將每個節點資訊封裝成Runnable任務單元,提交到執行緒池中.?

? ? --> Runnable中並執行QueueConsumer.consumer(.)方法.

? ? --> 如果是生產者,則釋出一個message時recipes將會在queuePath下建立一個PERSISTENT_SEQUENTIAL節點,同時儲存message資料. 消費時,也將按照節點的順序進行.

?

? ? 釋出訊息並沒有太多的問題僅僅是建立一個"有序"節點即可..但是對於消費者,那麼需要考慮的因數就很多,比如:1) 多個消費者同時消費時,需要確保訊息不能重複且有序 2) 訊息消費時,如果網路異常,怎麼辦?

? ? 對於QistributedQueue中,對上述問題的解決辦法也非常粗糙,內部機制如下:

? ? --> 如果使用了消費擔保(即指定了lockPath),在呼叫consumer方法之前,首先建立一個臨時節點(lockPath + 子節點),如果建立此臨時節點失敗也就意味著此訊息被其他消費者,則忽略此訊息.

? ? --> 然後從子節點中獲取資料,如果獲取失敗,意味著此節點已經被其他消費者刪除,則忽略此訊息.

? ? --> 然後呼叫consumer()方法,如果此方法丟擲異常,訊息將會再次新增到佇列中(刪除舊的子節點,建立一個新的子節點).如果消費正常,則刪除節點.

? ? --> 無論成敗,則刪除臨時節點(lockPath + 子節點)

?

? ? --> 如果沒有使用消費擔保,則首先獲取子節點的資料(getData),然後立即刪除此子節點

? ? -->呼叫consumer()方法.

?

? ? 問題:

? ? 1) 原始碼中,在獲取子節點的data時(getData)並沒有指定version校驗,我深度懷疑,當訊息被併發的消費時,是否有重複的可能.

? ? 2) 因為zookeeper本身獲取一個節點的子節點列表時,將得到所有的子節點,那麼就意味任何一個消費者中每次Event觸發,都將獲取整個childrenList,如果此列表很龐大,效能問題將是非常突出的.

? ? 3) 在消費擔保的情況下,每消費一個訊息,就會做"建立臨時節點""刪除臨時節點""獲取資料"等大量工作,如果有多個消費者同時執行,那麼對zk的操作次數將會倍數級增加,效能問題以及資料安全性問題,也是非常值得考慮的.

? ? 4) zookeeper已經限定了每個節點的資料尺寸,以及每個節點下子節點的個數,這對於實現規模性的分散式佇列,確實不是良好的選擇.

? ? 5) 當佇列中,消費者和生產者的速率不均衡時,問題將會更加嚴重,比如:快速的生產者 + 慢速的消費者;因為此時Event事件將會非常頻繁(網路消耗嚴重),對於消費者而言,執行緒上線切換鎖帶來的效能消耗,不可忽視.

?

? ? 最終,我們需要明確使用zookeeper作為分散式佇列的場景: 1) 佇列深度較小 2) 生產者和消費者的速度都非常的低且消費者消費速度更快,即單位時間內產生的訊息很少. ?3) 建議只有一個消費者.

? ? 比如: 一個數據分析系統,這個系統中有多個定時任務,當任務即將觸發時,向此分散式佇列中提交一個訊息,訊息中包含任務的ID,那麼誰消費了此任務ID,那麼誰就負責執行此任務.我們間接的實現了分散式任務單點執行的需求.

? ? 還有一種場景,比如實現"排它重入鎖"也可以使用DistributedQueue作為底層支撐.

?

? ? 消費者:DistributedQueueConusumer.java

public class DistributedQueueConsumer {

    private DistributedQueue<String> queue;

    public DistributedQueueConsumer(CuratorFramework client, String queuePath) throws Exception {
        QueueBuilder<String> builder = QueueBuilder.builder(client, new StringQueueConsumer(), new StringQueueSerializer(), queuePath);
        queue = builder.lockPath("queue-lock").buildQueue();//消費擔保
    }

    public void start() throws Exception {
        queue.start();
    }

    public void close() throws Exception {
        queue.close();
    }

    public static DistributedQueue distributedQueueAsProducer(CuratorFramework client, String path) throws Exception {
        QueueBuilder<String> builder = QueueBuilder.builder(client, null, new StringQueueSerializer(), path);
        builder.maxItems(1024);// 有界佇列,最大佇列深度,如果深度達到此值,將阻塞"生產者"建立新的節點.
        return builder.buildQueue();
    }

    //utils for producer and consumer
    static class StringQueueSerializer implements QueueSerializer<String> {
        private static final Charset charset = Charset.forName("utf-8");

        //as producer
        @Override
        public byte[] serialize(String item) {
            return item.getBytes(charset);
        }

        //as consumer
        @Override
        public String deserialize(byte[] bytes) {
            return new String(bytes, charset);
        }
    }


    class StringQueueConsumer implements QueueConsumer<String> {
        static final String TAG = "_TAG";
        @Override
        public void consumeMessage(String message) throws Exception {
            System.out.println("Consumer:" + message);
            if (message.equals(TAG)) {
                System.out.println("Tag message...ignore it.");
            }
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            switch (newState) {
                case RECONNECTED:
                    try {
                        //當連結重建之後,需要傳送一個TAG訊息,用於重新觸發本地的watcher,以便獲取新的children列表
                        queue.put(TAG);
                    } catch (Exception e) {
                        //
                    }
                    break;
                default:
                    System.out.println(newState.toString());
            }
        }
    }

}

? ? 生產者: DistributedQueueProducer.java

public class DistributedQueueProducer {

    private DistributedQueue<String> queue;

    public DistributedQueueProducer(CuratorFramework client, String queuePath) throws Exception {
        QueueBuilder<String> builder = QueueBuilder.builder(client, null, new StringQueueSerializer(), queuePath);
        queue = builder.lockPath("queue-lock").buildQueue();//消費擔保
    }

    public void start() throws Exception {
        queue.start();
    }

    public void close() throws Exception {
        queue.close();
    }

    public void put(String message) throws  Exception{
        queue.put(message);
    }

}

? ? ?

? ? Recipse還提供了其他2個API:

? ? 1) DistributedIdQueue: 內部基於DistributedQueue的所有機制,只是除了指定queue中訊息的內容之外,還可以指定一個ID,這個ID作為訊息的標記,最終此ID值將作為znode的path字尾.此後可以通過ID去消費(dequeue)一個訊息.佇列的排序方式是根據ID的字典順序--正序.

? ? 2) DistributedProrityQueue: 有權重的佇列,內部基於DistributedQueue,不過在釋出訊息時,需要指定此訊息的權重數字;佇列的排序方式為根據權重排序.

??

? ? 此外DistributedQueue的開發中,必須在QueueConsumer中關注"連結失效"的事件.

?

4. Counters

? ? 計數器,其中SharedCount可以用來監聽zookeeper中一個Integer型別的數字變更.此外還有一個更加基礎的API: SharedValue,你可以實現任意型別的計數器.

final SharedCount counter = new SharedCount(client,"/counter-vv",0);
counter.addListener(new SharedCountListener(){
	@Override
	public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
		//
		System.out.println("count changed:" + newCount);
	}

	@Override
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		switch (newState){
			case RECONNECTED:
				try {
					//當連結重建之後,需要手動fresh
					Integer current = counter.getCount();
					counter.trySetCount(current);//reflush,無論更新成敗,都會獲取最新的值
				} catch (Exception e) {
					//
				}
				break;
			default:
				System.out.println(newState.toString());
		}
	}
});
counter.start();

//counter.close();//取消watcher

? ? SharedCount中也需要關注"連結異常"的問題,我們可以通過註冊listener的方式,當連結重連成功後,重新獲取新的值.

? ? 在"計數器"中,還提供了DistributedAtomicInteger,DistributedAtomicLong兩個分散式自增計數器.

DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client,"/counter-vv",new RetryNTimes(32,1000));
AtomicValue<Integer> rc = atomicInteger.increment();
System.out.println("success:" + rc.succeeded() + ";before:" + rc.preValue() + ";after:" + rc.postValue());
//atomicInteger.add(1); //++
//atomicInteger.subtract(1); //--

?

5. Locks?

? ? 使用zookeeper作為分散式鎖,是一個普遍的需求,如下展示如何設計一個分散式重入鎖,其中一個path表示一個鎖資源.

public class DistributedLock{

    private InterProcessMutex lock;//重入的,排他的.
    private Map<Thread,Boolean> lockedThread = new WeakHashMap<Thread,Boolean>();

    private String lockPath;

    private ConnectionStateListener stateListener = new StateListener();

    private RevocationListener<InterProcessMutex> revocationListener;
    public DistributedLock(CuratorFramework client,String path){
        lockPath = path;
        revocationListener = new RevocationListener<InterProcessMutex>() {
            @Override
            public void revocationRequested(InterProcessMutex forLock) {
                if(!forLock.isAcquiredInThisProcess()){
                    return;
                }
                try{
                    forLock.release();
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        };
        lock = createLock(client);
        lock.makeRevocable(revocationListener);
        client.getConnectionStateListenable().addListener(stateListener);
    }

    public boolean lock(){
        try{
            lock.acquire();
            lockedThread.put(Thread.currentThread(),Boolean.TRUE);
        } catch (Exception e){
            //
        }
        return false;
    }

    public void unlock(){
        try{
            lock.release();
        }catch (Exception e){
            //
        }
    }

    private InterProcessMutex createLock(CuratorFramework client){
        lock = new InterProcessMutex(client,lockPath);
        //協同中斷,如果其他執行緒/程序需要此鎖中斷時,呼叫此listener.
        lock.makeRevocable(revocationListener);
        client.getConnectionStateListenable().addListener(stateListener);
        return lock;
    }

    class StateListener implements ConnectionStateListener{
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if(Boolean.FALSE.equals(lockedThread.get(Thread.currentThread()))){
                return;//如果當前lock沒有獲取鎖,則忽略
            }
            switch (newState){
                case LOST:
                    //一旦丟失連結,就意味著zk server端已經刪除了鎖資料
                    lockedThread.clear();
                    lock = createLock(client);//must be rebuild
                    break;
                default:
                    System.out.println(newState.toString());

            }
        }
    }
}

? ? 底層的機制非常的簡單: "獲取鎖"的操作,就是在zookeeper中建立一個EPHEMERAL_SEQUENTIAL節點,同時對此節點的臨近節點註冊一個watcher;當"臨近節點"被刪除時,表示其他程序已經釋放了鎖,此watcher將會觸發,並喚醒當前執行緒,然後acquire方法返回.."釋放鎖"的操作,就是刪除此臨時節點.此時臨近的下一個節點將獲得鎖..

? ? 所謂"重入",就是同一個執行緒多次獲取鎖時,如果此執行緒已經持有了鎖(即建立了zk臨時節點),事實上將不會再次建立zk的臨時節點,而是直接返回.

? ? 因為"重入鎖",基於臨時節點的特性,因此必須關注client連結重建的問題;粗糙的解決辦法,就是每次連結重建(session過期),重新例項化lock物件.?

?

6. Barrier

? ? 柵欄, 可以用來協同分散式環境中的執行緒.讓他們有條件的阻塞,且同時喚醒.

DistributedBarrier barrier = new DistributedBarrier(client,"/barrier");
barrier.setBarrier(); //設定barrier
System.out.println("setBarrier...");
barrier.waitOnBarrier();//等待其他程序移除barrier,此後所有的waitOnBarrier程序都將解除阻塞.
//barrier.removeBarrier(); //移除barrier,解除阻塞.
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client,"/d-barrier",12);
System.out.println("enter...");
barrier.enter();//阻塞,直到12個成員加入
System.out.println("running...");
barrier.leave();//阻塞,直到12個成員離開

? ? 其中DistributedDoubleBarrier為雙端柵欄,可以讓N個執行緒(程序)同時開始,並且同時退出..

? ? 對於DistributedBarrier內部機制非常簡單: setBarrier()方法就是建立"柵欄"節點,removeBarrier()方法就是刪除此節點;當執行setBarrier之後,所有的waitOnBarrier()操作都將阻塞,直到刪除節點的事件觸發.

?

7.LeaderSelector

? ? 在很多場景中,我們需要"leader選舉";比如在分散式有很多task,這些task的執行時機需要一個"Leader"去排程.任何時候,同一個leaderPath節點下,只會有一個"leader"..如下展示如何簡單的使用LeaderSelector.

public class LeaderSelectorClient {

    private LeaderSelector selector;

    private final Object lock = new Object();
    private boolean isLeader = false;

    public LeaderSelectorClient(CuratorFramework client, String leaderPath) throws Exception {
        LeaderSelectorListener selectorListener = new LeaderSelectorListener() {
            //此方法將會在Selector的執行緒池中的執行緒呼叫
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("I am leader...");
                //如果takeLeadership方法被呼叫,說明此selector例項已經為leader
                //此方法需要阻塞,直到selector放棄leader角色
                isLeader = true;
                while (isLeader) {
                    synchronized (lock) {
                        lock.wait();
                    }
                }
            }

            //這個方法將會在Zookeeper主執行緒中呼叫---watcher響應時
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("Connection state changed...");
                //對於LeaderSelector,底層實現為對leaderPath節點使用了"排他鎖",
                //"排他鎖"的本質,就是一個"臨時節點"
                //如果接收到LOST,說明此selector例項已經丟失了leader資訊.
                if (newState == ConnectionState.LOST) {
                    isLeader = false;
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
            }
        };
        selector = new LeaderSelector(client, leaderPath, selectorListener);
        //一旦leader釋放角色之後,是否繼續參與leader的選舉
        //此處需要關注CuratorFrameworker.RetryPolicy策略.
        //1) 如果leader是耐久性的,selector例項需要一致關注leader的狀態,可以autoRequeue
        //2) 如果leader再行使完任務之後,釋放,然後在此後的某個時刻再次選舉(比如定時任務),此處可以保持預設值false
        selector.autoRequeue();
    }


    public void start() {
        selector.start();
    }

    public void release() {
        //釋放leader角色
        isLeader = false;
        //takeLeadership方法將會中斷並返回.
        selector.interruptLeadership();
        synchronized (lock){
            lock.notifyAll();//
        }
    }

    //重新獲取leader角色--選舉
    public void take() {
        selector.requeue();
    }

    public void close() {
        isLeader = false;
        selector.close();
        synchronized (lock){
            lock.notifyAll();//
        }
    }

    public boolean isLeader() {
        return selector.hasLeadership();
    }

}

? ? 開發者使用LeaderSelector時,需要關注takeLeadership方法的內部邏輯,一旦takeLeadership方法被呼叫,那麼此selector已經是leader角色了,你可以在此方法中增加"事件通知"等來執行一些非同步的操作.

? ? isLeader()方法只能返回當前的狀態,有可能返回true之後不久,這個selector例項將不不再是leader,那麼就需要我們在listener中更多的關注stateChanged過程.

?

8. LeaderLatch

? ? 上述LeaderSelector開發中,開發者需要關注"連結異常"情況,也需要自己去阻塞leader角色變更,也需要自己去封裝"leader角色變更"時的事件處理器....recipse元件已經實現了LeaderLatch來初步解決上述問題,這個類是個便捷的類,如果你還需要更多的處理,恐怕還是需要自己去封裝LeaderSelector.

LeaderLatch latch = new LeaderLatch(client,"/task/leader");
//讓listener在單獨的執行緒池中執行
Executor executor = Executors.newCachedThreadPool();
//每個listener都用來執行角色變換的事件處理.
LeaderLatchListener latchListener = new LeaderLatchListener() {
	@Override
	public void isLeader() {
		System.out.println("I am leader...");
	}

	@Override
	public void notLeader() {
		System.out.println("I am not leader...");
	}
};
latch.addListener(latchListener,executor);
latch.start();
latch.await();//等待leader角色.
//在await退出之後,你需要通過其他手段繼續關注leader狀態變更.
System.out.println(latch.hasLeadership());
Thread.sleep(5000);
latch.close();
Thread.sleep(2000);
client.close();

?

??

? ? 到此為止,我們已經把curator-recipse的大部分API都介紹完畢了,希望我們的zookeeper開發之旅更加愉快.

轉:http://www.chengxuyuans.com/Java+/72042.html