zookeeper HA 實現負載均衡
阿新 • • 發佈:2018-10-31
在網上看到太多千篇一律的zookeeper相關的文章,都是定義,沒有一個是有完整程式碼的,這對自己學習zk十分困難,其實要用zk實現主備切換、負載均衡其實沒有自己想象的那麼難,只需要瞭解zk的基本特性即可。在這裡貼上自己寫的程式碼與自己的理解,大家多多指教!
一、思路
軟負載說簡單點,就是將Client端的請求均勻的分配到不同 的server端。下面我們來說說實現的基本思路:
客戶端:
- 註冊:首先你需要確定一個父節點,在這裡父節點的名稱暫且就叫/parentNode;每個server端啟動時首先向zk的叢集的父節點/parentNode下去註冊一個臨時的子節點,這樣當有N臺server時,註冊的子節點就是/parentNode/server1、
- 獲取服務列表:在Client端實現輪詢分發的功能,實現Watcher介面,這會讓你實時的監控服務端的變化。首先去獲取父節點/parentNode下所有的子節點,得到之前存的ip與port,然後將這些列表快取到一map中,這裡就叫serverUrlCacheMap。由於實現了Watcher介面,當父節點發生變化時zk 的叢集會通知Client端,此時Client端只要重新獲取父節點下所有子節點的資料,重新快取即可
- 輪詢分發:定義一個全域性變數index ,每次發起請求時,直接去serverUrlCacheMap中獲取這個編號的URL,然後傳送給server,就可達到輪詢分發的功能。
package com.newcosoft.lsmp.bank.server; import java.rmi.Naming; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.newcosoft.lsmp.bank.client.config.BankConfig; import com.newcosoft.lsmp.bank.client.constant.BankConstant; import com.newcosoft.lsmp.bank.client.zookeeper.BankZookHelper; import com.newcosoft.lsmp.bank.server.biz.front.rmi.RmiService; import com.newcosoft.lsmp.bank.server.config.BankDatabaseConfig; import com.newcosoft.lsmp.bank.server.util.DefaultBeanFactory; import com.newcosoft.util.StringUtils; public class LsmpBankStart { private static Logger logger = LoggerFactory.getLogger(LsmpBankStart.class); private ZooKeeper zk; public static void main(String[] args) { try { // 啟動時,先註冊zk節點,重點在這裡 <strong><span style="color:#ff0000;">new LsmpBankStart().registerZook(args[0], args[1]);</span></strong> // 以下為你專案啟動的核心程式碼,這裡我專案的server端使用的是rmi協議,可以不用看了 DefaultBeanFactory bf = DefaultBeanFactory .getInstance("/spring/springContext_lsmp_bank.xml"); RmiService rmiService = (RmiService) bf.getBean("business"); int rmiPort = Integer.valueOf(args[1]); createRegistry(rmiPort); System.setProperty("java.rmi.server.hostname", args[0]); UnicastRemoteObject.exportObject(rmiService, rmiPort); String rmiUrl = "rmi://" + args[0] + ":" + rmiPort + "/" + args[2]; Naming.rebind(rmiUrl, rmiService); } catch (Exception e) { logger.error("LsmpBankStart failed!", e); System.exit(1); } } // 初始化zk的連線 private void initZk() { try { if (zk == null || !zk.getState().isAlive()) { synchronized (this) { if (zk != null) { zk.close(); } // 重新建立連線 zk = new ZooKeeper(BankConfig.getInstance().getZookURL(), BankConstant.HA_SESSION_TIMEOUT, this); while (zk.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(3000); } } } } catch (Exception e) { logger.error("zk初始化連線異常:" + e.toString()); } } private void registerZook(String ip, String port) { // 初始化zk initZk(); // 父節點路徑 String parentNode = BankConfig.getInstance().getBankServerParentNode(); String[] nodeList = parentNode.split("/"); String nodePath = ""; // 迴圈建立持久父節點 for (String node : nodeList) { if (!StringUtils.isEmpty(node)) { nodePath = nodePath + "/" + node; BankZookHelper.createNode(zk, nodePath, node, CreateMode.PERSISTENT); } } // 建立臨時子節點 //BankZookHelper為封裝的zookeeper一些基礎API的類 BankZookHelper.createNode(zk, nodePath + "/" + ip, ip + ":" + port, CreateMode.EPHEMERAL); } }<pre name="code" class="java">package com.newcosoft.lsmp.bank.client.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.newcosoft.lsmp.bank.client.config.BankConfig; import com.newcosoft.lsmp.bank.client.constant.BankConstant; import com.newcosoft.lsmp.bank.client.rmi.BankServerRMIClient; public class LoadBalanceZookHelper implements Watcher { private static Logger logger = LoggerFactory .getLogger(LoadBalanceZookHelper.class); private ZooKeeper zk; // 可用的服務列表 private List serverUrlList; // 獲取service 的索引 private AtomicInteger index = new AtomicInteger(0); // 快取的BankServerRMIClient private Map bankServerRMIClientCacheMap = new ConcurrentHashMap(); public LoadBalanceZookHelper() { // 初始化zk initZk(); cacheBankServerRMIClients(); } private static class SingletonHolder { static final LoadBalanceZookHelper INSTANCE = new LoadBalanceZookHelper(); } public static LoadBalanceZookHelper getInstance() { return SingletonHolder.INSTANCE; } // 輪詢分發服務 public BankServerRMIClient distributeServer() { if (index.get() >= Integer.MAX_VALUE) { index.set(0); } int a = index.get() % serverUrlList.size(); // 自增長 index.incrementAndGet(); String url = serverUrlList.get(a); if (logger.isDebugEnabled()) { logger.debug("請求的URL:" + url); } // TODO 刪掉syso System.out.println("請求的URL:" + url); return bankServerRMIClientCacheMap.get(url); } // 註冊rmi ,並快取 private void registerRmiAndCache(List serverUrlList) { for (String url : serverUrlList) { String host[] = url.split(":"); BankServerRMIClient bankServerRMIClient = new BankServerRMIClient( host[0], Integer.parseInt(host[1])); bankServerRMIClientCacheMap.put(url, bankServerRMIClient); } } // 初始化,快取rmi客戶端服務 public void cacheBankServerRMIClients() { // 獲取到所有的 serverURL serverUrlList = getServerUrlList(BankConfig.getInstance() .getBankServerParentNode()); if (logger.isDebugEnabled()) { logger.debug("重新快取可用的服務列表,serverUrlList:" + serverUrlList); } if (serverUrlList.size() > 0 && serverUrlList != null) { // 註冊並快取 registerRmiAndCache(serverUrlList); } else { logger.error("可用的服務列表為空"); throw new RuntimeException("可用的服務列表為空"); } } // 獲取zk連線 private void initZk() { try { if (zk == null || !zk.getState().isAlive()) { synchronized (this) { if (zk != null) { zk.close(); } // 重新建立連線 zk = new ZooKeeper(BankConfig.getInstance().getZookURL(), BankConstant.HA_SESSION_TIMEOUT, this); while (zk.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(3000); } } } } catch (Exception e) { logger.error("zk初始化連線異常:" + e.toString()); } } // 獲取可用的服務列表 public List getServerUrlList(String parentNodePath) { List serverList = new ArrayList(); try { // 獲取所有子節點 List nodePathList = zk.getChildren(parentNodePath, true); if (nodePathList.size() > 0 && nodePathList != null) { for (String hostPath : nodePathList) { hostPath = parentNodePath + "/" + hostPath; // 獲取子節點資料URL String data = new String(zk.getData(hostPath, true, null)); serverList.add(data); } } } catch (KeeperException e) { logger.error(e.toString()); } catch (InterruptedException e) { logger.error(e.toString()); } return serverList; } // 本來有個很簡單的辦法,直接將原來的快取去掉,然後在重新註冊,但是由於註冊rmi底層也是用所 tcp/ip協議,由於這個協議比較耗時, 所以使用以下方式。請大家根據實際情況來處理 // 節點發生變化時,重新快取rmi 客戶端服務 ,簡單來說就是更新快取 public void reCacheBankServerRMIClients() { // 將原來的serverURL 賦值給 List oldServerUrlList = serverUrlList; // 獲取最新的 serverURL,保留住 serverUrlList = getServerUrlList(BankConfig.getInstance() .getBankServerParentNode()); if (logger.isDebugEnabled()) { logger.debug("子節點發生變化,重新獲取的服務的URL:" + serverUrlList); } // 將最新的serverURL賦值給 List newServerUrlList = serverUrlList; List reducedUrlList = new ArrayList(); reducedUrlList.addAll(oldServerUrlList); List incrementUrlList = new ArrayList(); incrementUrlList.addAll(newServerUrlList); // 獲取down掉服務的的URL reducedUrlList.removeAll(newServerUrlList); // 獲取新增服務的URL incrementUrlList.removeAll(oldServerUrlList); if (reducedUrlList.size() > 0 && reducedUrlList != null) { for (String reducedUrl : reducedUrlList) { // 將down掉的URL的服務從快取中減掉 bankServerRMIClientCacheMap.remove(reducedUrl); } if (logger.isDebugEnabled()) { logger.debug("去除掉的服務的URL:" + reducedUrlList); } } if (incrementUrlList.size() > 0 && incrementUrlList != null) { // 將新增的URL重新註冊rmi 並快取住 registerRmiAndCache(incrementUrlList); if (logger.isDebugEnabled()) { logger.debug("新增的服務的URL:" + incrementUrlList); } } } @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.Expired) { logger.debug("觸發了回話過期事件"); // 重新連線zk initZk(); // 重新快取 reCacheBankServerRMIClients(); } // if (event.getState() == KeeperState.SyncConnected) { // logger.debug("觸發了斷開重連事件"); // // 重新快取 // reCacheBankServerRMIClients(); // } if (event.getType() == EventType.NodeChildrenChanged) { logger.debug("觸發了子節點變化事件"); // 重新快取 reCacheBankServerRMIClients(); } } public static void main(String[] args) { LoadBalanceZookHelper.getInstance(); } }
<div style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; text-indent: 28px; line-height: 1.5;">
</div>
<pre name="code" class="java"><div style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; text-indent: 28px; line-height: 1.5;"><br style="line-height: 1.5;" /></div><div style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; text-indent: 28px; line-height: 1.5;"><strong>客戶端:</strong></div>
<span style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; line-height: 20px; text-indent: 28px; white-space: pre; background-color: rgb(240, 240, 240);">package com.newcosoft.lsmp.bank.client.zookeeper;</span>
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.newcosoft.lsmp.bank.client.config.BankConfig;
import com.newcosoft.lsmp.bank.client.constant.BankConstant;
import com.newcosoft.lsmp.bank.client.rmi.BankServerRMIClient;
public class LoadBalanceZookHelper implements Watcher {
private static Logger logger = LoggerFactory
.getLogger(LoadBalanceZookHelper.class);
private ZooKeeper zk;
// 可用的服務列表
private List serverUrlList;
// 獲取service 的索引
private AtomicInteger index = new AtomicInteger(0);
// 快取的BankServerRMIClient
private Map bankServerRMIClientCacheMap = new ConcurrentHashMap();
public LoadBalanceZookHelper() {
// 初始化zk
initZk();
cacheBankServerRMIClients();
}
private static class SingletonHolder {
static final LoadBalanceZookHelper INSTANCE = new LoadBalanceZookHelper();
}
public static LoadBalanceZookHelper getInstance() {
return SingletonHolder.INSTANCE;
}
// 輪詢分發服務
public BankServerRMIClient distributeServer() {
if (index.get() >= Integer.MAX_VALUE) {
index.set(0);
}
int a = index.get() % serverUrlList.size();
// 自增長
index.incrementAndGet();
String url = serverUrlList.get(a);
if (logger.isDebugEnabled()) {
logger.debug("請求的URL:" + url);
}
// TODO 刪掉syso
System.out.println("請求的URL:" + url);
return bankServerRMIClientCacheMap.get(url);
}
// 註冊rmi ,並快取
private void registerRmiAndCache(List serverUrlList) {
for (String url : serverUrlList) {
String host[] = url.split(":");
BankServerRMIClient bankServerRMIClient = new BankServerRMIClient(
host[0], Integer.parseInt(host[1]));
bankServerRMIClientCacheMap.put(url, bankServerRMIClient);
}
}
// 初始化,快取rmi客戶端服務
public void cacheBankServerRMIClients() {
// 獲取到所有的 serverURL
serverUrlList = getServerUrlList(BankConfig.getInstance()
.getBankServerParentNode());
if (logger.isDebugEnabled()) {
logger.debug("重新快取可用的服務列表,serverUrlList:" + serverUrlList);
}
if (serverUrlList.size() > 0 && serverUrlList != null) {
// 註冊並快取
registerRmiAndCache(serverUrlList);
} else {
logger.error("可用的服務列表為空");
throw new RuntimeException("可用的服務列表為空");
}
}
// 獲取zk連線
private void initZk() {
try {
if (zk == null || !zk.getState().isAlive()) {
synchronized (this) {
if (zk != null) {
zk.close();
}
// 重新建立連線
zk = new ZooKeeper(BankConfig.getInstance().getZookURL(),
BankConstant.HA_SESSION_TIMEOUT, this);
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(3000);
}
}
}
} catch (Exception e) {
logger.error("zk初始化連線異常:" + e.toString());
}
}
// 獲取可用的服務列表
public List getServerUrlList(String parentNodePath) {
List serverList = new ArrayList();
try {
// 獲取所有子節點
List nodePathList = zk.getChildren(parentNodePath, true);
if (nodePathList.size() > 0 && nodePathList != null) {
for (String hostPath : nodePathList) {
hostPath = parentNodePath + "/" + hostPath;
// 獲取子節點資料URL
String data = new String(zk.getData(hostPath, true, null));
serverList.add(data);
}
}
} catch (KeeperException e) {
logger.error(e.toString());
} catch (InterruptedException e) {
logger.error(e.toString());
}
return serverList;
}
// 本來有個很簡單的辦法,直接將原來的快取去掉,然後在重新註冊,但是由於註冊rmi底層也是用所 tcp/ip協議,由於這個協議比較耗時, 所以使用以下方式。請大家根據實際情況來處理
// 節點發生變化時,重新快取rmi 客戶端服務 ,簡單來說就是更新快取
public void reCacheBankServerRMIClients() {
// 將原來的serverURL 賦值給
List oldServerUrlList = serverUrlList;
// 獲取最新的 serverURL,保留住
serverUrlList = getServerUrlList(BankConfig.getInstance()
.getBankServerParentNode());
if (logger.isDebugEnabled()) {
logger.debug("子節點發生變化,重新獲取的服務的URL:" + serverUrlList);
}
// 將最新的serverURL賦值給
List newServerUrlList = serverUrlList;
List reducedUrlList = new ArrayList();
reducedUrlList.addAll(oldServerUrlList);
List incrementUrlList = new ArrayList();
incrementUrlList.addAll(newServerUrlList);
// 獲取down掉服務的的URL
reducedUrlList.removeAll(newServerUrlList);
// 獲取新增服務的URL
incrementUrlList.removeAll(oldServerUrlList);
if (reducedUrlList.size() > 0 && reducedUrlList != null) {
for (String reducedUrl : reducedUrlList) {
// 將down掉的URL的服務從快取中減掉
bankServerRMIClientCacheMap.remove(reducedUrl);
}
if (logger.isDebugEnabled()) {
logger.debug("去除掉的服務的URL:" + reducedUrlList);
}
}
if (incrementUrlList.size() > 0 && incrementUrlList != null) {
// 將新增的URL重新註冊rmi 並快取住
registerRmiAndCache(incrementUrlList);
if (logger.isDebugEnabled()) {
logger.debug("新增的服務的URL:" + incrementUrlList);
}
}
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
logger.debug("觸發了回話過期事件");
// 重新連線zk
initZk();
// 重新快取
reCacheBankServerRMIClients();
}
// if (event.getState() == KeeperState.SyncConnected) {
// logger.debug("觸發了斷開重連事件");
// // 重新快取
// reCacheBankServerRMIClients();
// }
if (event.getType() == EventType.NodeChildrenChanged) {
logger.debug("觸發了子節點變化事件");
// 重新快取
reCacheBankServerRMIClients();
}
}
public static void main(String[] args) {
LoadBalanceZookHelper.getInstance();
}
}
客戶端: