1. 程式人生 > >利用ZooKeeper API模擬HDFS節點的監聽模式

利用ZooKeeper API模擬HDFS節點的監聽模式


首先我們需要理解一件事情,雖然大多數人學習ZooKeeper是因為Hadoop和大資料,但實際上ZK只是分散式一致性演算法的實現,和大資料以及Hadoop並無任何關係。

ZK本身是一套樹樁結構的檔案系統,這個系統每個檔案節點可以存放一點兒資料。重點是這個檔案系統十分敏感,一旦任何資料發生變化,ZK就會檢測到並且報告給客戶端。

我們利用這個特性來監管分散式系統的伺服器,配置檔案,名稱空間等。

這裡我給出一個小例子,希望幫助初學者理解,ZK是如何實現上述功能的

準備工作:

1,通過ZK的客戶端視窗,建立一個空的檔案,用來作為分散式檔案系統的跟目錄

create /servergroup ""

2,我們通過ZK API寫一個監聽程式,如果/servergroup下面節點發生變化,我們就打印出最新的節點列表。

現實中每個節點代表一個伺服器,伺服器啟動時候,就會寫入一個節點檔案。伺服器斷開後,節點檔案就會消失。我們可以根據這一點判斷伺服器是否連線。

package com.dynamic.client;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
//客戶端監聽服務,監控ZK伺服器指定節點變化資訊
//如果znode(伺服器節點)的子節點發生變化,就會更新客戶端的serverlist
public class AppClient {
	//Application伺服器節點路徑
	private String groupnode = "/servergroup";
	private ZooKeeper zk;//客戶端例項
	private Stat stat= new Stat(); //儲存節點狀態資訊
	//儲存server列表資料
	//volatile多執行緒的情況下保持資料一致性
	private volatile List<String> serverlist;
	
	
	//建立連結伺服器的Client,第三個引數是建立監視器例項
	public void connectZK() throws Exception{
		
		String hosts = "server72:2181,server73:2181,server74:2181";//zookeeper伺服器列表
		final int SESSION_TIMEOUT = 30000;//會話有效時間		
		
		//我們獲得了一個已經連線ZK伺服器的Client,這個Client會話有效時間是30秒,有一個watcher監控該例項
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
			@Override
			public void process(WatchedEvent event) {
				//如果發生子節點發生變化事件,就更新serverlist,並重新繫結watcher
				//子節點發生變化 && 時間的路徑是 "/servergroup"
				if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged && groupnode.equals(event.getPath())){
					
					try {
						//呼叫更新serverlist方法
						updateServerlist();
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			}
			
		});
		//建立物件後更新一次server列表
		System.out.println("---客戶端已經連線伺服器---");
		updateServerlist();	
	}
	
	//更新server列表的方法
	public void updateServerlist() throws Exception {
		List<String> newserverlist = new ArrayList<String>();
		//ture表示給groupnode新增watcher(watcher被觸發後就失效了)
		//返回所有子節點的名稱
		List<String> childlist = zk.getChildren(groupnode, true);
		
		for(String childname:childlist){
			//獲取節點的資料
			byte data [] =zk.getData(groupnode+"/"+childname, false,stat);
			String childdata = new String(data,"utf-8");//按照utf-8轉化為字串
			//新增child的全路徑
			newserverlist.add(groupnode+"/"+childname+"/"+childdata);			
		}
		serverlist = newserverlist;
		System.out.println("-----伺服器列表已經更新----"+new Date());
		for(String ss:serverlist){
			System.out.println(ss);
		}
		
	}
	
	public void handle() throws InterruptedException{
		Thread.sleep(Long.MAX_VALUE);		
	}
	
	public static void main(String[] args) throws Exception {
		AppClient ac = new AppClient();
		ac.connectZK();
		ac.handle();//執行緒等待
	}
}

測試:

我在客戶端輸入命令 [zk: localhost:2181(CONNECTED) 17] create /servergroup/server2 "192.168.1.2"
Created /servergroup/server2
[zk: localhost:2181(CONNECTED) 18] create /servergroup/server2 "192.168.1.5"
Node already exists: /servergroup/server2
[zk: localhost:2181(CONNECTED) 19] create /servergroup/server2 "192.168.1.9"
Node already exists: /servergroup/server2
[zk: localhost:2181(CONNECTED) 20] create /servergroup/server9 "192.168.1.9"
Created /servergroup/server9
[zk: localhost:2181(CONNECTED) 21] create /servergroup/server5 "192.168.1.5"
Created /servergroup/server5
[zk: localhost:2181(CONNECTED) 22] delete /servergroup/server5

eclipse端輸出以下資訊 ---客戶端已經連線伺服器---
-----伺服器列表已經更新----Thu Nov 02 18:30:24 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
-----伺服器列表已經更新----Thu Nov 02 18:35:18 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server2/"192.168.1.2"
-----伺服器列表已經更新----Thu Nov 02 18:35:42 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server9/"192.168.1.9"
/servergroup/server2/"192.168.1.2"
-----伺服器列表已經更新----Thu Nov 02 18:35:52 CST 2017
/servergroup/server5/"192.168.1.5"
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server9/"192.168.1.9"
/servergroup/server2/"192.168.1.2"
-----伺服器列表已經更新----Thu Nov 02 18:36:09 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server9/"192.168.1.9"
/servergroup/server2/"192.168.1.2"

現在我在重新寫一個程式,在eclipse上模擬建立一個臨時的,序列化的節點,並在3秒鐘後斷開連線。臨時節點意味著,斷開客戶端連線後這個節點就自動消失。

根據這個特性,伺服器如果斷開我們就可以迅速感知到,並且列出剩餘可用的伺服器。

package com.dynamic.client;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ServerMissTest {
	private String groupnode = "/servergroup";
	private ZooKeeper zk;
	private Stat stat= new Stat();
	
	void zkConnect() throws IOException{
		String hosts = "server72:2181,server73:2181,server74:2181";//zookeeper伺服器列表
		final int SESSION_TIMEOUT = 30000;//會話有效時間		
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){

			@Override
			public void process(WatchedEvent event) {
				// TODO Auto-generated method stub
				System.out.println("--start watching--");
				System.out.println(event.toString());
			}
			
		});
	}
	
	void createNodeServer() throws KeeperException, InterruptedException{
		//建立一個臨時的,序列化的節點,模仿HDFS的namenode
		zk.create(groupnode+"/"+"HDFS-Namenode", "Namenode1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
	}
	
	public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
		ServerMissTest smt = new ServerMissTest();
		smt.zkConnect();
		//建立了一個臨時的節點,並在3秒鐘後斷開連線。這個臨時的幾點就會消失。
		smt.createNodeServer();		
		Thread.sleep(3000);		
		
		//我們希望監聽埠能夠檢測到節點消失,並且打印出現在活著的節點。
		
	}
}
測試結果:

在40分56秒,我啟動了程式,客戶端監聽到了

在41分27秒,執行緒3秒接收後了,但整個過程持續了30秒左右,客戶端監聽到了Namenode1消失,並且打印出了可以使用的伺服器列表

-----伺服器列表已經更新----Thu Nov 02 18:36:09 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server9/"192.168.1.9"
/servergroup/server2/"192.168.1.2"
-----伺服器列表已經更新----Thu Nov 02 18:40:56 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server9/"192.168.1.9"
/servergroup/server2/"192.168.1.2"
/servergroup/HDFS-Namenode0000000013/Namenode1
-----伺服器列表已經更新----Thu Nov 02 18:41:27 CST 2017
/servergroup/server3/"192.168.1.3"
/servergroup/server1/"192.168.1.1"
/servergroup/server9/"192.168.1.9"
/servergroup/server2/"192.168.1.2"


以上,今天感覺CSDN不能貼上圖片非常不爽,準備搬家部落格園吧。

相關推薦

利用ZooKeeper API模擬HDFS節點模式

首先我們需要理解一件事情,雖然大多數人學習ZooKeeper是因為Hadoop和大資料,但實際上ZK只是分散式一致性演算法的實現,和大資料以及Hadoop並無任何關係。 ZK本身是一套樹樁結構的檔案系統,這個系統每個檔案節點可以存放一點兒資料。重點是這個檔案系統十分敏感,

zookeeper增刪改查與API

//監聽單節點內容 public class WatchDemo{ public static void main(String[] args) throws Exception { private String connectString="ip1:2181,ip2:2181,ip

Springboot2(29)整合zookeeper的增刪改查、節點、分散式讀寫鎖、分散式計數器

原始碼地址 springboot2教程系列 實現zookeeper節點的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器 新增依賴 <properties> <project.build.sourceEncoding

zookeeper源碼之配置

com continue 點數據 lis process 節點數 hashset ace tree   配置存儲不僅維護了一個樹結構,還對各個節點添加了變更監聽。 類圖   DataTree內部維護兩個通知管理器,分別監聽節點數據變更和子節點變更。 public cl

Curator之PathChildrenCache子節點

Curator之PathChildrenCache子節點監聽: /*子節點監聽*/ //子節點新增watcher //PathChildrenCache:監聽資料節點的增刪改,會觸發事件 String childNodePathCach

JQuery 節點

into 修改 style doc 監聽 msu moved rom 其他 DOMSubtreeModified: 在DOM結構發生任何變化的時候。這個事件在其他事件觸發後都會觸發; 1 $(".attr_box").bind("DOMSubtreeModified"

oracle 10g rac一個節點狀態不正常,但crs_stat -t 顯示正常

$lsnrctl statusLSNRCTL for Solaris: Version 10.2.0.5.0 - Production on 18-NOV-2010 00:14:07Copyright (c) 1991, 2010, Oracle.  All rights

CuratorFramework節點二 TreeCache

節點監聽 2.TreeCache 特點: (1)永久監聽指定節點下的節點的變化 (2)可以監聽到指定節點下所有節點的變化,比如說指定節點”/example”, 在下面新增”node1”可以監聽到,但是新增”node1/n1”也能被監聽到 (3)可以監聽

zookeeper內部機制與註冊機制

zookeeper應用:You can use it off-the-shelf to implement consensus, group management, leader election, and presence protocols. And

分散式系統筆記:利用zookeeper實現分散式leader節點選舉

利用zookeeper實現分散式leader節點選舉 依賴原理 在ZK中新增基本節點,路徑程式定義,節點型別為持久節點(PERSISTENT)。 對需要競選leader的每個程序,在ZK中分別新增基本節點的子節點,型別為臨時自編號節點(EPHEMERA

RAC其中一個節點沒有起來的解決方案【ora.LISTENER.lsnr INTERMEDIATE】

RAC,crsctl stat res -t發現單節點監聽offline。 按照如下操作: srvctl stop listener srvctl stop scan_listener ps -eaf | grep tns 如果有,就殺掉 srvctl

設定樹莓派的無線網絡卡為模式(monitor)

先使用命令檢視無線網絡卡的名字: ifconfig 結果如下: eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 192.168.123.165 netmask 255.

觀察者模式和事件模式的區別

監聽機制 其他 不包含 機制 監聽 多態 場景 觀察者模式 特定 事件監聽模式更像是觀察者模式的進階。 觀察者模式中,‘主題’會在特定邏輯下通知所有‘觀察者’。如果這個通知不包含任何信息,那麽這種實現就是通常的觀察者模式。 如果‘主題’通知‘觀察者’的過程帶有一些<其

springboot 中事件模式的一種實現

前言: 事件監聽模式是一種常用的設計模式,在springboot 中我們如何實現呢? 首先我們要理解事件監聽中需要的幾個角色 事件釋出者 (即事件源) 事件監聽者 事件本身 廢話不多說直接上程式碼 定義事件本身 事件本身需要繼承ApplicationEvent package com.yxd; impo

無線網絡卡模式

監聽模式(Monitor Mode),或RFMON (Radio Frequency MONitor),是指無線網絡卡可以接收所有經過它的資料流的工作方式,對應於IEEE 802.11網絡卡的其他模式,諸如Master(路由器)、Managed(普通模式的網絡卡)、Ad

iOS模式系列之NSNotificationCenter的簡單使用

NSNotificationCenter 對於這個沒必要多說,就是一個訊息通知機制,類似廣播。觀察者只需要向訊息中心註冊感興趣的東西,當有地方發出這個訊息的時候,通知中心會發送給註冊這個訊息的物件。這樣也起到了多個物件之間解耦的作用。蘋果給我們封裝了這個NSNotifica

從生活中領悟模式——坑爹的熱水器

【故事劇情】 剛剛大學畢業的Tony隻身來到北京這個碩大的城市,開始了北漂的生活。但剛剛畢業的他身無絕技、包無分文,為了生活只能住在沙河鎮一個偏僻的村子裡,每天坐著程式設計師專線(13號線)來回穿梭於昌平區與西城區…… 在一個寒冷的冬天,下班之

linux下將無線網絡卡工作模式切換為模式

網上的辦法有些遺漏,根據它的方法會報錯如下: 即裝置忙,因此需要先關閉無線網絡卡,在無線網絡卡關閉狀態下改變工作模式: 關閉後執行如下命令 將工作模式切換到監聽模式之後再開啟無線網絡卡 成功: 輸入iwconfig即可看見mode變成了Monitor 完畢。 注意:

JS實現模式和觀察者模式

[b]引子:[/b]最近看阮一峰先生的[url=http://www.ruanyifeng.com/blog/2012/12/asynchronous_javascript.html]這篇[/url]文章,文章涉及到觀察模式,監聽模式相關的設計模式的內容,正好,我最近也用sw

樹莓派無線網卡模式

dbm mac ada .com enc 方案 如果 說明 奇技淫巧 Background 項目裏需要在樹莓派上運行抓包程序,需要使用無線網卡,進入監聽模式,進行抓包; 默認的無線網卡是不支持monitor模式的,支持monitor模式的網卡列表可以在這裏查到 於是我選了