ZooKeeper系列(9):ZooKeeper實現分布式Barrier和Queue
1. 快速開始
1.1概述:
Zookeeper是Hadoop的一個子項目,它是分布式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分布式同步、組服務等。
1.2 使用常見
1.2.1 統一配置
把配置放在ZooKeeper的節點中維護,當配置變更時,客戶端可以收到變更的通知,並應用最新的配置。
1.2.2,集群管理
集群中的節點,創建ephemeral的節點,一旦斷開連接,ephemeral的節點會消失,其它的集群機器可以收到消息。
1.2.3 分布式鎖
多個客戶端發起節點創建操作,只有一個客戶端創建成功,從而獲得鎖。
1.3 安裝和配置
通過官方下載鏈接zookeeper 進行下載,解壓後進入conf目錄,新建一個zoo.conf文件,配置內容如下:
tickTime=2000 dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data dataLogDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog clientPort=4399 initLimit=5 syncLimit=2
tickTime: ZooKeeper基本時間單位(ms)
initLimit: 指定了啟動zookeeper時,zookeeper實例中的隨從實例同步到領導實例的初始化連接時間限制,超出時間限制則連接失敗(以tickTime為時間單位);
syncLimit: 指定了zookeeper正常運行時,主從節點之間同步數據的時間限制,若超過這個時間限制,那麽隨從實例將會被丟棄
clientPort: 用於連接客戶端的端口
接下來進入bin目錄啟動ZooKeeper實例以及客戶端連接:
./zkServer.sh start
./zkCli.sh -server localhost:4399
接下來看看集群如何配置,其實跟單機差不多,這裏我們把剛剛下載的Zookeeper復制多兩份,一共是三個,配置信息如下:
tickTime=2000 dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog clientPort=4399 initLimit=5 syncLimit=2 server.1=127.0.0.1:8880:9990 server.2=127.0.0.1:8881:9991 server.3=127.0.0.1:8882:9992
三個文件夾下面的zoo.conf都是這個格式,需要修改dataDir,dataDir,clientPort,
然後在dataDir所指向的目錄下面新建一個myid文件,對應server.x,比如第一個文件夾下面的myid就填入一個1,第二個就填入一個2,以此類推。接著依次啟動即可。可以采用下面的命令
echo "1" > myid
2.使用java來操作ZooKeeper實例
一門技術最重要的就算實戰了,接下來的內容將圍繞這一部分來講。
首先是Znode的創建和刪除
Znode有兩種類型:短暫的和持久的。短暫的znode在創建的客戶端與服務器端斷開(無論是明確的斷開還是故障斷開)連接時,該znode都會被刪除;相反,持久的znode則不會
public class CreateGroup implements Watcher { //會話延時 private static final int SESSION_TIMEOUT = 1000; //zk對象 private ZooKeeper zk = null; //同步計數器 private CountDownLatch countDownLatch = new CountDownLatch(1); //客戶端連接到服務器時會觸發觀察者進行調用 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ countDownLatch.countDown();//計數器減一 } } public void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await();//阻塞程序繼續執行 } //創建GROUP public void create(String groupName) throws KeeperException, InterruptedException{ String path = "/" + groupName; //允許任何客戶端對該znode進行讀寫,以及znode進行持久化 String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("Created "+createPath); } //關閉zk public void close() throws InterruptedException{ if(zk != null){ try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } //測試主類 public static void main(String args[]){ String host = "127.0.0.1:4399"; String groupName = "test"; CreateGroup createGroup = new CreateGroup(); try { createGroup.connect(host); createGroup.create(groupName); createGroup.close(); createGroup = null; System.gc(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }View Code
接下來把創建和銷毀分離出來作為一個獨立的類,以後相關操作可以直接使用
public class ConnetctionWatcher implements Watcher { private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1); public void process(WatchedEvent event) { KeeperState state = event.getState(); if(state == KeeperState.SyncConnected){ countDownLatch.countDown(); } } public void connection(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await(); } public void close() throws InterruptedException { if (null != zk) { try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }View Code
接下來我們看看節點如何刪除
public class DeleteGroup extends ConnetctionWatcher { public void delete(String groupName) { String path = "/" + groupName; try { List<String> children = zk.getChildren(path, false); for(String child : children){ zk.delete(path + "/" + child, -1); } zk.delete(path, -1);//版本號為-1, } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }View Code
3. 利用java實現分布式Barrier
Barrier是一種控制和協調多個任務觸發次序的機制。簡單來說就是用一個屏障把將要執行的任務攔住,等待所有任務都處於可運行狀態才放開屏障,其實在單機上我們可以利用CyclicBarrier來實現這個機制,但是在分布式環境下,我們可以利用ZooKeeper可以派上用場,我們可以利用一個Node來作為Barrier的實體,然後要Barrier的任務通過調用exists檢測是否Node存在,當需要打開Barrier時候,刪除這個Node,這樣ZooKeeper的watch機制會通知到各個任務可以開始執行。接下來看代碼:
public class Barrier extends SyncPrimitive { int size; String name; Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; //創建Barrier的Node if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } try { name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } /** * 加入Barrier等待 */ boolean enter() throws KeeperException, InterruptedException{ zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } } /** * 一直等待知道指定數量節點到達 */ boolean leave() throws KeeperException, InterruptedException{ zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } }View Code
父類代碼如下
public class SyncPrimitive implements Watcher { static ZooKeeper zk = null; static Integer mutex; //根節點 String root; SyncPrimitive(String address) { if(zk == null){ try { System.out.println("Starting ZK:"); zk = new ZooKeeper(address, 3000, this); mutex = new Integer(-1); System.out.println("Finished starting ZK: " + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } //else mutex = new Integer(-1); } synchronized public void process(WatchedEvent event) { synchronized (mutex) { System.out.println("Process: " + event.getType()); mutex.notify(); } } public static void queueTest(String args[]) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input: " + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try{ q.produce(10 + i); } catch (KeeperException e){ } catch (InterruptedException e){ } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) { try{ int r = q.consume(); System.out.println("Item: " + r); } catch (KeeperException e){ i--; } catch (InterruptedException e){ } } } } public static void barrierTest(String args[]) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try{ boolean flag = b.enter(); System.out.println("Entered barrier: " + args[2]); if(!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e){ } catch (InterruptedException e){ } Random rand = new Random(); int r = rand.nextInt(100); for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try{ b.leave(); } catch (KeeperException e){ } catch (InterruptedException e){ } System.out.println("Left barrier"); } //測試用的主類 public static void main(String args[]) { /* args =new String[] {"qTest","localhost:4399","3","c"}; if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); */ } }View Code
4. 分布式隊列(Queue)
在分布式環境下,實現Queue需要高一致性來保證,那麽我們可以這樣來設計。把一個Node當成一個隊列,然後children用來存儲內容,利用ZooKeeper提供的順序遞增的模式(會自動在name後面加入一個遞增的數字來插入新元素)。於是在offer時候我們可以使用create,take時候按照順序把children第一個delete就可以了。ZooKeeper保證了各個server上數據是一致的。廢話不多說了,直接看代碼
/** * 一個消費者-生產者模式的消息隊列 */ public class Queue extends SyncPrimitive { Queue(String address, String name) { super(address); this.root = name; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * 隊列中插入數據 */ boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL); return true; } /** * 把元素從隊列中移除 */ int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; //得到現在隊列中首個可用的節點 while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); //System.out.println("Temporary value: " + tempValue); if(tempValue < min) min = tempValue; } System.out.println("Temporary value: " + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } }View Code
ZooKeeper系列(9):ZooKeeper實現分布式Barrier和Queue