Python與ZooKeeper叢集連線
阿新 • • 發佈:2019-01-25
由於專案的需要,需要學習Python客戶端連線ZooKeeper叢集,並實現建立臨時節點、獲得指定的路徑下的資訊、監聽子節點變化的功能。
環境配置
使用下面的命令安裝kazoo
pip install kazoo
基本使用
監聽子節點變化
下面的程式碼實現了建立一個臨時、順序的節點,並且可以監聽子節點的變化。
#-*- coding: utf-8 -*- import time from kazoo.client import KazooClient from kazoo.recipe.watchers import ChildrenWatch class ValidatorDetector: def __init__(self): self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.validator_children_watcher = ChildrenWatch(client=self.zk,path='/mproxy/validators',func=self.validator_watcher_fun) self.zk.start() def validator_watcher_fun(self,children): print "The children now are:", children def create_node(self): self.zk.create('/mproxy/validators/validator',b'validator_huabei_1',ephemeral=True,sequence=True,makepath=True) def __del__(self): self.zk.close() if __name__ == '__main__': detector = ValidatorDetector() detector.create_node() time.sleep(10)
ZooKeeper原生提供了監聽節點變化及值的變化的API。關於這一部分可以參考http://blog.csdn.net/mrbcy/article/details/54790758。但是這些API只能生效一次,一旦被觸發過一次以後就不會再觸發了,除非再次註冊。而kazoo則在這個基礎上封裝了更上層的API,可以持續的觸發。這就是上面的ChildrenWatch,除此之外kazoo還封裝了一個DataWatch,用於監聽資料的變化。下面我們也會用到。
註冊驗證器
有了上面的知識就可以做一個註冊類和一個監測類了。
#-*- coding: utf-8 -*- import threading import time from kazoo.client import KazooClient from kazoo.protocol.states import KazooState class InfoKeeper(threading.Thread): def __init__(self,register): threading.Thread.__init__(self) self.register=register def run(self): time.sleep(0.25) if self.register.zk_node is None: print "create method has not been called" return check_result = self.register.zk.exists(self.register.validator_path) if check_result is None: # redo the regist print "redo the regist" self.register.regist() else: print "the path remain exists" class ValidatorRegister: def __init__(self): self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.zk_node = None self.validator_path = '/mproxy/validators/' self.zk.add_listener(self.conn_state_watcher) self.zk.start() def __del__(self): self.zk.close() def regist(self): self.zk_node = self.zk.create(self.validator_path + 'validator',bytes('validator_huabei_1'),ephemeral=True,sequence=True,makepath=True) def close(self): self.zk.stop() self.zk.close() def conn_state_watcher(self, state): if state == KazooState.CONNECTED: print "Now connected" if self.zk_node is None: print "create method has not been called" return info_keeper = InfoKeeper(self) info_keeper.start() elif state == KazooState.LOST: print "Now lost" else: print "Now suspended"
監測類:
#-*- coding: utf-8 -*- import time from kazoo.client import KazooClient from kazoo.recipe.watchers import ChildrenWatch class ValidatorDetector: def __init__(self): self.validator_path = '/mproxy/validators/' self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.validator_children_watcher = ChildrenWatch(client=self.zk,path=self.validator_path,func=self.validator_watcher_fun) self.zk.start() def validator_watcher_fun(self,children): for child in children: validator_name = self.zk.get(path=self.validator_path + str(child)) print validator_name[0] print "The children now are:", children def __del__(self): self.zk.close()
註冊類這裡稍微複雜了一點,做了一個在會話過期後重新註冊的機制,如果會話過期,重新註冊之前的註冊資訊。
監聽子節點值的變化
嗯,這個需求仔細想過後可以通過監聽子節點的變化來代替,所以暫時不實現了。