zookeeper負載均衡和資料同步
阿新 • • 發佈:2018-12-21
如何利用zookeeper做負載均衡呢,並且能夠讓客戶端動態監控服務端的狀態,一旦有的伺服器掛掉,客戶端能夠迅速感知,從而做出調整。
先演示一遍:注意,本地要執行一個zookeeper,讓客戶端和服務端分別和zookeeper進行連線,能實時跟zookeeper保持聯絡。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import org.I0Itec.zkclient.ZkClient; import org.apache.log4j.Logger; //服務端 public class SimpleServer implements Runnable { private static Logger logger = Logger.getLogger(SimpleServer.class.getName()); public static void main(String[] args) throws IOException { int port = 18081; SimpleServer server = new SimpleServer(port); Thread thread = new Thread(server); thread.start(); } private int port; public SimpleServer(int port) { this.port = port; } private void regServer() { //向ZooKeeper註冊當前伺服器 ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000); String pathroot = "/test"; if (!client.exists(pathroot)) { logger.info("建立根節點:" + pathroot); client.createPersistent(pathroot); } String path = "/test/server" + port; if(client.exists(path)) { client.delete(path); } client.createEphemeral(path, "127.0.0.1:" + port); } @Override public void run() { ServerSocket server = null; try { server = new ServerSocket(port); regServer(); System.out.println("Server started at " + port); Socket socket = null; while (true) { socket = server.accept(); new Thread(new SimpleServerHandler(socket)).start(); } } catch(IOException ex) { ex.printStackTrace(); } finally { if (server != null) { try { server.close(); } catch (IOException e) {} } } } } class SimpleServerHandler implements Runnable { private Socket socket; public SimpleServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader( this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("Receive : " + body); out.println("Hello, " + body); } } catch (Exception e) { if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (out != null) { out.close(); } if (this.socket != null) { try { this.socket.close(); } catch (IOException e1) { e1.printStackTrace(); } this.socket = null; } } } }
package com.tecno.BoomPlayerLog.bigdata.zookeeper.client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; //客戶端 public class SimpleClient { private static Integer pos = 0; private static List<String> servers = new ArrayList<>(); public static void main(String[] args) { initServerList(); SimpleClient client = new SimpleClient(); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String name; try { name = console.readLine(); if ("exit".equals(name)) { System.exit(0); } client.send(name); } catch (IOException e) { e.printStackTrace(); } } } private static void initServerList() { // 啟動時從ZooKeeper讀取可用伺服器 String path = "/test"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); List<String> childs = zkClient.getChildren(path); servers.clear(); for (String p : childs) { servers.add(zkClient.readData(path + "/" + p)); } // 訂閱節點變化事件 zkClient.subscribeChildChanges("/test", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println(String.format("[ZookeeperRegistry] service list change: path=%s, currentChilds=%s", parentPath, currentChilds.toString())); servers.clear(); for (String p : currentChilds) { servers.add(zkClient.readData(path + "/" + p)); } System.out.println("Servers: " + servers.toString()); } }); } public static String getServer() { // return servers.get(new Random().nextInt(servers.size())); 隨機演算法 // 輪詢演算法 String server = null; if (pos >= servers.size()) { pos = 0; } server = servers.get(pos); pos++; return server; } public SimpleClient() { } public void send(String name) { String server = SimpleClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(cfg[0], Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println(name); while (true) { String resp = in.readLine(); if (resp == null) break; else if (resp.length() > 0) { System.out.println("Receive : " + resp); break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
首先,先執行服務端的main方法釋出服務,注意修改埠號,以免埠號衝突啟動不了,這裡我們執行三次服務端。執行完結果如下:
還有兩個服務端的控制檯也是類似;
然後執行客戶端的main方法,執行完後,在控制檯上可以輸入任意字串,回車之後(可以重複操作),服務端就會接收訊息,並且返回給客戶端,代表接收請求,並且處理請求。效果如下:
這裡我傳送三次訊息,也得到服務端的回覆,我們再看服務端的顯示:
可見每個服務端都請求一次,就是輪詢的效果。然後現在關閉一個服務端,結果如下(必須過一會,要進行心跳檢查):
客戶端就會感知,現在再來發送訊息,就是其他兩臺服務端輪詢處理訊息了,同理 一旦新增服務端,客戶端也可以有效感知。
資料同步呢?就是利用zookeeper的Watcher機制來監控zookeeper資料節點內容的變化,不過只能一次性,我們需要改良,進行實時監控。相關程式碼如下:
import java.io.IOException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class MonitoringZookeeperNodesContinuous {
private ZooKeeper zooKeeper;
{
try {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 6000, null);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test //我們讓客戶端訂閱節點,就能收到節點變化的內容了
public void testProcessClient() throws KeeperException, InterruptedException {
Stat stat = new Stat();
// 客服端訂閱節點
String path = "/server/address";
//獲取zookeeper更新後的版本資訊
String initResult = doProcess(path, stat);
System.err.println("initResult ==> " + initResult);
// 處理業務更新版本,載入最新版本的資訊到記憶體中
//為了讓程式不停止方便看效果
while (true) {
}
}
/**
* @Description: 以遞迴方式實現節點狀態的持續監控的方法
* @author yunyao.huang
* @throws InterruptedException
* @throws KeeperException
* @date 2018年8月7日
*/
public String doProcess(final String path, final Stat stat) throws KeeperException, InterruptedException {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
String innerResult = doProcess(path, stat);
System.out.println(innerResult);
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
byte[] data = zooKeeper.getData(path, watcher, stat);
return new String(data);
}
@Test
public void testWatcherOnce() throws KeeperException, InterruptedException {
Stat stat = new Stat();
// 客戶端訂閱的zookeeper的配置資訊所儲存的路徑節點
String path = "/server/address";
// 回撥函式
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.err.println("zookeeper的配置資訊發生改變了");
}
};
zooKeeper.getData(path, watcher, stat);
// 不能讓程式停止,否則看不到效果
while (true) {
}
}
@Test
public void testCMSSetDataServer() {
String path = "/server/address";
// CMS更新完的版本號
byte[] data = "Version71".getBytes();
try {
// 讓zookeeper更新COM的版本號 ,一更新,訂閱的client就會感知
zooKeeper.setData(path, data, -1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
執行testProcessClient()方法,然後執行testCMSSetDataServer()方法。效果如下:
每一次客戶端只要修改節點中的值,zookeeper立刻通知所有的客戶端,客戶端自然就會得到最新的訊息了,就可以實現資料同步了。
關於zookeeper,可以看下:這篇文章說的挺不錯的 ,看完就會對上面的操作有更深的體會了,知道原理還是非常有必要的。