1. 程式人生 > >基於zookeeper的分散式佇列之同步佇列

基於zookeeper的分散式佇列之同步佇列

1、同步佇列

同步佇列,顧名思義就是當佇列中的元素滿了的時候去做事情。

例如:一個公司組織旅遊,只有當報名了的員工全部到齊後司機才能出發。

我們把需求拆分開:

1、員工簽到,我們可以用zookeeper的znode節點來模擬,在/queue路徑下建立子節點,簽到一個建立一個znode節點

2、員工到齊,只有當/queue節點下的子節點滿了以後,才會通知司機發車,比如公司有100個人,那麼就是/queue節點下有100個節點是,才會通知司機出發

3、通知司機,如何通知司機呢?我們可以用zookeeper的事件通知機制,司機客戶端去監控某個節點,比如/start節點,當/queue佇列滿了的時候去建立一個/start節點,如此,監控了/start節點的客戶端(這裡就是司機客戶端)就會收到事件觸發。

同步佇列如圖:

OK~~~~~講了這麼多,直接上程式碼吧!!

這個是同步佇列的核心程式碼:

public class ZKQueue {

    private static String path = "/queue";

    private static String start  = "/start";

    private static int limit;

    private ZooKeeper client = ZookeeperUtil.getClient();

    public ZKQueue(int  limit) {
        ZKQueue.limit = limit;
        init();
    }

    private void init() {
        try {
            if(client.exists(path,false) == null) {
                client.create(path,"xx".getBytes(),ZooDefs.Ids
                        .OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void push(String data) {
        try {
            //000000089,000000090,0000000091   /queue/0000000091
            String currentPath = client.create(path + "/", data.getBytes(), ZooDefs.Ids
                    .OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            //佇列已經滿了
            if(limit <= size()) {
                List<String> childrens = client.getChildren(path, false);
                Collections.sort(childrens);
                //二分法排序,拿到當前節點在childrens這個list中的位置? 只有當位置是最後一個的時候,我才
                //建立start節點
                int i = Collections.binarySearch(childrens,currentPath.substring
                        (path
                        .length() + 1));

                //只有最後一個位置的時候我才建立start節點
                if(i == (limit -1)) {
                    System.out.println("佇列滿了,建立start節點~~~~");
                    client.create(start,data.getBytes(), ZooDefs.Ids
                            .OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //獲取到path節點下面的子節點個數
    private int size() {
        try {
            return client.getChildren(path,false).size();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 0;
    }
}

ZookeeperUtil工具類程式碼:

public class ZookeeperUtil {

    private static String connectStr = "192.168.60.104:2181";

    private static CountDownLatch cdl = new CountDownLatch(1);

    public static ZooKeeper getClient() {
        //CONNECTING  正在連線  CONNECTED 連線狀態
        // watcher這個匿名類就是一個事件模板定義
        try {
            ZooKeeper client = new ZooKeeper(connectStr, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {

                    //如果連線狀態變成了connected就會觸發這個事件
                    if (Event.KeeperState.SyncConnected == watchedEvent
                            .getState()) {
                        //第一次連線
                        if (Event.EventType.None == watchedEvent.getType() &&
                                null == watchedEvent.getPath()) {
                            System.out.println("觸發了連線狀態事件!!" + watchedEvent
                                    .getType());
                            cdl.countDown();
                        } else if (Event.EventType.NodeCreated == watchedEvent
                                .getType()) {
                            System.out.println("觸發了節點建立事件" + watchedEvent.getType());
                            System.out.println(Thread.currentThread().getName
                                    () + "觸發了事件,並且得到了通知!!");
                        } else if (Event.EventType.NodeDataChanged ==
                                watchedEvent.getType()) {
                            System.out.println("觸發了節點修改事件" + watchedEvent
                                    .getType());
                        } else if (Event.EventType.NodeDeleted == watchedEvent
                                .getType()) {
                            System.out.println("觸發了節點刪除事件" + watchedEvent
                                    .getType());
                        }
                    }

                }
            });
            cdl.await();

            client.exists("/start",true);

            return client;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

        return null;
    }
}

測試類程式碼:

public class MyTest {

    public static void main(String[] args) {
        int limit = 10;

        for (int i = 0; i < limit; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    ZKQueue queue = new ZKQueue(10);
                    queue.push("xx");
                }
            }).start();
        }
    }
}

OK,大功告成,這個就是同步佇列基於zookeeper的實現~~