zookeeper示例--(主從高可用)實時更新server列表
和balance高可用可以設計出很穩定的後臺任務架構。http://sling2007.blog.163.com/blog/static/84732713201362563350363/
場景描述
在分散式應用中, 我們經常同時啟動多個server, 呼叫方(client)選擇其中之一發起請求(又比如有很多執行任務的機器,但某個任務只需要一臺或幾臺機器上執行).
分散式應用必須考慮高可用性和可擴充套件性: server的應用程序可能會崩潰, 或者server本身也可能會宕機. 當server不夠時, 也有可能增加server的數量. 總而言之, server列表並非一成不變, 而是一直處於動態的增減中.
那麼client如何才能實時的更新server列表呢? 解決的方案很多, 本文將講述利用ZooKeeper的解決方案.
思路
啟 動server時, 在zookeeper的某個znode(假設為/sgroup)下建立一個子節點. 所建立的子節點的型別可以是EPHEMERAL_SEQUENTIAL或者EPHEMERAL(EPHEMERAL_SEQUENTIAL可以實現主從效果,EPHEMERAL則是對等的機器,沒有主從效果), 這樣一來, 如果server程序崩潰, 或者server宕機, 與zookeeper連線的session就結束了, 那麼其所建立的子節點會被zookeeper自動刪除. 當崩潰的server恢復後,
或者新增server時, 同樣需要在/sgroup節點下建立新的子節點.
對於client, 只需註冊/sgroup子節點的監聽, 當/sgroup下的子節點增加或減少時, zookeeper會通知client, 此時client更新server列表.
Ephemeral和SEQUENTIAL的型別可以查查文件,有很多應用場景的
實現
假設我們有三個類:
NewsWatcher:註冊和監聽zk中的節點變化
Task:假設我們的業務邏輯在這裡
Test:用於測試
下面直接給出原始碼啦。。。。。。。
newswatcher.java
package master_slave;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
publicclassNewsWatcher{
// 在zk上註冊的機器節點列表
publicstaticSortedSet<String> servers;
// 由zk自動遞增分配的節點id
publicstaticString myNodeID;
privateZooKeeper zk;
privatefinalStat stat =newStat();
// news應用的頂層目錄
privatefinalString spath ="/NewsWatcher";
// 分隔符
privatefinalString delimiter ="/";
publicNewsWatcher(String id)throwsException{
try{
// 建立一個與伺服器的連線
zk =newZooKeeper("127.0.0.1:2181",5000,newWatcher(){
@Override
publicvoid process(WatchedEventevent){
// System.out.println("node Change:" + event);
// 如果發生了spath節點下的子節點變化事件, 更新server列表, 並重新註冊監聽
if(event.getType()==EventType.NodeChildrenChanged
&& spath.equals(event.getPath())){
try{
updateServerList();
}catch(Exception e){
e.printStackTrace();
}
}
}
});
createParentDirectory();
createAppNode(id);
updateServerList();
}catch(Exception e){
e.printStackTrace();
}
}
// 建立一個子目錄節點
publicvoid createAppNode(String id)throwsException{
myNodeID = zk.create(spath + delimiter, id.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
myNodeID = myNodeID.substring(myNodeID.lastIndexOf('/')+1);
}
// 如果不存在則建立頂層目錄
publicvoid createParentDirectory()throwsException{
Stat stat =null;
try{
stat = zk.exists(spath,true);
}catch(Exception e){
e.printStackTrace();
}
if(stat ==null){
zk.create(spath,"news watcher".getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}
// 更新伺服器列表
publicvoid updateServerList()throwsException{
SortedSet<String>set=newTreeSet<String>();
// 獲取並監聽spath的子節點變化
// watch引數為true, 表示監聽子節點變化事件.
// 每次都需要重新註冊監聽, 因為一次註冊, 只能監聽一次事件, 如果還想繼續保持監聽, 必須重新註冊
List<String> subList = zk.getChildren(spath,true);
for(String subNode : subList){
// 獲取每個子節點下關聯的server地址
// byte[] data = zk.getData(spath + delimiter + subNode, false,
// stat);
// System.out.println(subNode + "\t" + stat);
// String sdata = new String(data, "utf-8");
set.add(subNode);
}
servers =set;
}
// 關閉連線
publicvoid close()throwsInterruptedException{
zk.close();
}
}
task.java
package master_slave;
publicclassTaskimplementsRunnable{
@Override
publicvoid run(){
while(true){
try{
mybussiness();
Thread.sleep(10000l);
}catch(Exception e){
e.printStackTrace();
}
}
}
// 尋找id最小的節點來做任務(可以改成尋找最小的2個機器來做)
publicvoid mybussiness(){
System.out.println("all nodes:"+NewsWatcher.servers);
String minNode =NewsWatcher.servers.first();
System.out.println(NewsWatcher.myNodeID +" "+ minNode);
if(NewsWatcher.myNodeID.equals(minNode)){// 驗證本機是否是最小節點
System.out.println("i am the leader. i could do this job");
}else{
System.out.println("i am not the leader. i could not do this job");
}
}
}
test.java
package master_slave;
publicclassTest{
publicstaticvoid main(String[] args)throwsException{
String nodeid ="autoid";
if(args.length ==1){
nodeid = args[0];
}
newNewsWatcher(nodeid);
// 不讓程式退出,而是每5秒輪詢著做一個任務
newThread(newTask()).start();
}
}
啟動多個客戶端測試一下吧。。。O(∩_∩)O哈哈~