1. 程式人生 > >Python與ZooKeeper叢集連線

Python與ZooKeeper叢集連線

由於專案的需要,需要學習Python客戶端連線ZooKeeper叢集,並實現建立臨時節點、獲得指定的路徑下的資訊、監聽子節點變化的功能。

環境配置

使用下面的命令安裝kazoo

pip install kazoo

基本使用

監聽子節點變化

下面的程式碼實現了建立一個臨時、順序的節點,並且可以監聽子節點的變化。

#-*- coding: utf-8 -*-
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import ChildrenWatch




class ValidatorDetector:

    def __init__(self):
        self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
        self.validator_children_watcher = ChildrenWatch(client=self.zk,path='/mproxy/validators',func=self.validator_watcher_fun)
        self.zk.start()

    def validator_watcher_fun(self,children):
        print "The children now are:", children

    def create_node(self):
        self.zk.create('/mproxy/validators/validator',b'validator_huabei_1',ephemeral=True,sequence=True,makepath=True)

    def __del__(self):
        self.zk.close()





if __name__ == '__main__':
    detector = ValidatorDetector()
    detector.create_node()
    time.sleep(10)

ZooKeeper原生提供了監聽節點變化及值的變化的API。關於這一部分可以參考http://blog.csdn.net/mrbcy/article/details/54790758。但是這些API只能生效一次,一旦被觸發過一次以後就不會再觸發了,除非再次註冊。而kazoo則在這個基礎上封裝了更上層的API,可以持續的觸發。這就是上面的ChildrenWatch,除此之外kazoo還封裝了一個DataWatch,用於監聽資料的變化。下面我們也會用到。

註冊驗證器

有了上面的知識就可以做一個註冊類和一個監測類了。

#-*- coding: utf-8 -*-
import threading
import time
from kazoo.client import KazooClient
from kazoo.protocol.states import KazooState

class InfoKeeper(threading.Thread):
    def __init__(self,register):
        threading.Thread.__init__(self)
        self.register=register

    def run(self):
        time.sleep(0.25)
        if self.register.zk_node is None:
            print "create method has not been called"
            return
        check_result = self.register.zk.exists(self.register.validator_path)
        if check_result is None:
            # redo the regist
            print "redo the regist"
            self.register.regist()
        else:
            print "the path remain exists"

class ValidatorRegister:
    def __init__(self):
        self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
        self.zk_node = None
        self.validator_path = '/mproxy/validators/'
        self.zk.add_listener(self.conn_state_watcher)
        self.zk.start()


    def __del__(self):
        self.zk.close()

    def regist(self):
        self.zk_node = self.zk.create(self.validator_path + 'validator',bytes('validator_huabei_1'),ephemeral=True,sequence=True,makepath=True)

    def close(self):
        self.zk.stop()
        self.zk.close()

    def conn_state_watcher(self, state):
        if state == KazooState.CONNECTED:
            print "Now connected"

            if self.zk_node is None:
                print "create method has not been called"
                return
            info_keeper = InfoKeeper(self)
            info_keeper.start()
        elif state == KazooState.LOST:
            print "Now lost"
        else:
            print "Now suspended"

監測類:

#-*- coding: utf-8 -*-
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import ChildrenWatch




class ValidatorDetector:

    def __init__(self):
        self.validator_path = '/mproxy/validators/'
        self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
        self.validator_children_watcher = ChildrenWatch(client=self.zk,path=self.validator_path,func=self.validator_watcher_fun)
        self.zk.start()

    def validator_watcher_fun(self,children):
        for child in children:
            validator_name = self.zk.get(path=self.validator_path + str(child))
            print validator_name[0]
        print "The children now are:", children


    def __del__(self):
        self.zk.close()

註冊類這裡稍微複雜了一點,做了一個在會話過期後重新註冊的機制,如果會話過期,重新註冊之前的註冊資訊。

監聽子節點值的變化

嗯,這個需求仔細想過後可以通過監聽子節點的變化來代替,所以暫時不實現了。