基於zookeeper的分散式佇列之同步佇列
阿新 • • 發佈:2018-11-02
1、同步佇列
同步佇列,顧名思義就是當佇列中的元素滿了的時候去做事情。
例如:一個公司組織旅遊,只有當報名了的員工全部到齊後司機才能出發。
我們把需求拆分開:
1、員工簽到,我們可以用zookeeper的znode節點來模擬,在/queue路徑下建立子節點,簽到一個建立一個znode節點
2、員工到齊,只有當/queue節點下的子節點滿了以後,才會通知司機發車,比如公司有100個人,那麼就是/queue節點下有100個節點是,才會通知司機出發
3、通知司機,如何通知司機呢?我們可以用zookeeper的事件通知機制,司機客戶端去監控某個節點,比如/start節點,當/queue佇列滿了的時候去建立一個/start節點,如此,監控了/start節點的客戶端(這裡就是司機客戶端)就會收到事件觸發。
同步佇列如圖:
OK~~~~~講了這麼多,直接上程式碼吧!!
這個是同步佇列的核心程式碼:
public class ZKQueue { private static String path = "/queue"; private static String start = "/start"; private static int limit; private ZooKeeper client = ZookeeperUtil.getClient(); public ZKQueue(int limit) { ZKQueue.limit = limit; init(); } private void init() { try { if(client.exists(path,false) == null) { client.create(path,"xx".getBytes(),ZooDefs.Ids .OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void push(String data) { try { //000000089,000000090,0000000091 /queue/0000000091 String currentPath = client.create(path + "/", data.getBytes(), ZooDefs.Ids .OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //佇列已經滿了 if(limit <= size()) { List<String> childrens = client.getChildren(path, false); Collections.sort(childrens); //二分法排序,拿到當前節點在childrens這個list中的位置? 只有當位置是最後一個的時候,我才 //建立start節點 int i = Collections.binarySearch(childrens,currentPath.substring (path .length() + 1)); //只有最後一個位置的時候我才建立start節點 if(i == (limit -1)) { System.out.println("佇列滿了,建立start節點~~~~"); client.create(start,data.getBytes(), ZooDefs.Ids .OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } //獲取到path節點下面的子節點個數 private int size() { try { return client.getChildren(path,false).size(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return 0; } }
ZookeeperUtil工具類程式碼:
public class ZookeeperUtil { private static String connectStr = "192.168.60.104:2181"; private static CountDownLatch cdl = new CountDownLatch(1); public static ZooKeeper getClient() { //CONNECTING 正在連線 CONNECTED 連線狀態 // watcher這個匿名類就是一個事件模板定義 try { ZooKeeper client = new ZooKeeper(connectStr, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //如果連線狀態變成了connected就會觸發這個事件 if (Event.KeeperState.SyncConnected == watchedEvent .getState()) { //第一次連線 if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { System.out.println("觸發了連線狀態事件!!" + watchedEvent .getType()); cdl.countDown(); } else if (Event.EventType.NodeCreated == watchedEvent .getType()) { System.out.println("觸發了節點建立事件" + watchedEvent.getType()); System.out.println(Thread.currentThread().getName () + "觸發了事件,並且得到了通知!!"); } else if (Event.EventType.NodeDataChanged == watchedEvent.getType()) { System.out.println("觸發了節點修改事件" + watchedEvent .getType()); } else if (Event.EventType.NodeDeleted == watchedEvent .getType()) { System.out.println("觸發了節點刪除事件" + watchedEvent .getType()); } } } }); cdl.await(); client.exists("/start",true); return client; } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return null; } }
測試類程式碼:
public class MyTest {
public static void main(String[] args) {
int limit = 10;
for (int i = 0; i < limit; i++) {
new Thread(new Runnable() {
@Override
public void run() {
ZKQueue queue = new ZKQueue(10);
queue.push("xx");
}
}).start();
}
}
}
OK,大功告成,這個就是同步佇列基於zookeeper的實現~~