zookeeper【6】負載均衡
負載均衡是一種手段,用來把對某種資源的訪問分攤給不同的裝置,從而減輕單點的壓力。
架構圖
圖中左側為ZooKeeper叢集,右側上方為工作伺服器,下面為客戶端。每臺工作伺服器在啟動時都會去zookeeper的servers節點下注冊臨時節點,每臺客戶端在啟動時都會去servers節點下取得所有可用的工作伺服器列表,並通過一定的負載均衡演算法計算得出一臺工作伺服器,並與之建立網路連線。網路連線我們採用開源框架netty。
流程圖
負載均衡客戶端流程
服務端主體流程
類圖
Server端核心類
每個服務端對應一個Server介面,ServiceImpl是服務端的實現類。把服務端啟動時的註冊過程抽出為一個介面RegistProvider,並給予一個預設實現DefaultRegistProvider,它將用到一個上下文的類ZooKeeperRegistContext。我們的服務端是給予Netty的,它需要ServerHandler來處理與客戶端之間的連線,當有客戶端建立或失去連線時,我們都需要去修改當前伺服器的負載資訊,我們把修改負載資訊的過程也抽出為一個介面BalanceUpdateProvider,並且給予了一個預設實現DefaultBalanceUpdateProvider。ServerRunner是排程類,負責排程我們的Server。
Client端核心類
每個客戶端都需要實現一個Client介面,ClientImpl是實現,Client需要ClientHandler來處理與伺服器之前的通訊,同時它需要BalanceProvider為它提供負載均衡的演算法。BalanceProvider是介面,它有2個實現類,一個是抽象的實現AbstractBalanceProvider,一個是預設的實現DefaultBalanceProvider。ServerData是服務端和客戶端共用的一個類,服務端會把自己的基本資訊,包括負載資訊,打包成ServerData並寫入到zookeeper中,客戶端在計算負載的時候需要到zookeeper中拿到ServerData,並取得服務端的地址和負載資訊。ClientRunner是客戶端的排程類,負責啟動客戶端。
服務端程式碼:
public interface BalanceUpdateProvider { // 增加負載 public boolean addBalance(Integer step); // 減少負載 public boolean reduceBalance(Integer step); }
package com.test.cbd.zookeeper.loadbalance; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkBadVersionException; import org.apache.zookeeper.data.Stat; public class DefaultBalanceUpdateProvider implements BalanceUpdateProvider { private String serverPath; private ZkClient zc; public DefaultBalanceUpdateProvider(String serverPath, ZkClient zkClient) { this.serverPath = serverPath; this.zc = zkClient; } public boolean addBalance(Integer step) { Stat stat = new Stat(); ServerData sd; // 增加負載:讀取伺服器的資訊ServerData,增加負載,並寫回zookeeper while (true) { try { sd = zc.readData(this.serverPath, stat); sd.setBalance(sd.getBalance() + step); // 帶上版本,因為可能有其他客戶端連線到伺服器修改了負載 zc.writeData(this.serverPath, sd, stat.getVersion()); return true; } catch (ZkBadVersionException e) { // ignore } catch (Exception e) { return false; } } } public boolean reduceBalance(Integer step) { Stat stat = new Stat(); ServerData sd; while (true) { try { sd = zc.readData(this.serverPath, stat); final Integer currBalance = sd.getBalance(); sd.setBalance(currBalance>step?currBalance-step:0); zc.writeData(this.serverPath, sd, stat.getVersion()); return true; } catch (ZkBadVersionException e) { // ignore } catch (Exception e) { return false; } } } }
package com.test.cbd.zookeeper.loadbalance; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; public class DefaultRegistProvider implements RegistProvider { // 在zookeeper中建立臨時節點並寫入資訊 public void regist(Object context) throws Exception { // Server在zookeeper中註冊自己,需要在zookeeper的目標節點上建立臨時節點並寫入自己 // 將需要的以下3個資訊包裝成上下文傳入 // 1:path // 2:zkClient // 3:serverData ZooKeeperRegistContext registContext = (ZooKeeperRegistContext) context; String path = registContext.getPath(); ZkClient zc = registContext.getZkClient(); try { zc.createEphemeral(path, registContext.getData()); } catch (ZkNoNodeException e) { String parentDir = path.substring(0, path.lastIndexOf('/')); zc.createPersistent(parentDir, true); regist(registContext); } } public void unRegist(Object context) throws Exception { return; } }
package com.test.cbd.zookeeper.loadbalance; public interface RegistProvider { public void regist(Object context) throws Exception; public void unRegist(Object context) throws Exception; }
package com.test.cbd.zookeeper.loadbalance; public interface Server { public void bind(); }
package com.test.cbd.zookeeper.loadbalance; import java.io.Serializable; public class ServerData implements Serializable,Comparable<ServerData> { private static final long serialVersionUID = -8892569870391530906L; private Integer balance; private String host; private Integer port; public Integer getBalance() { return balance; } public void setBalance(Integer balance) { this.balance = balance; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } @Override public String toString() { return "ServerData [balance=" + balance + ", host=" + host + ", port=" + port + "]"; } public int compareTo(ServerData o) { return this.getBalance().compareTo(o.getBalance()); } }
package com.test.cbd.zookeeper.loadbalance; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * 處理服務端與客戶端之間的通訊 */ public class ServerHandler extends ChannelHandlerAdapter { private final BalanceUpdateProvider balanceUpdater; private static final Integer BALANCE_STEP = 1; public ServerHandler(BalanceUpdateProvider balanceUpdater){ this.balanceUpdater = balanceUpdater; } public BalanceUpdateProvider getBalanceUpdater() { return balanceUpdater; } // 建立連線時增加負載 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("one client connect..."); balanceUpdater.addBalance(BALANCE_STEP); } // 斷開連線時減少負載 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { balanceUpdater.reduceBalance(BALANCE_STEP); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package com.test.cbd.zookeeper.loadbalance; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; public class ServerImpl implements Server { private EventLoopGroup bossGroup = new NioEventLoopGroup(); private EventLoopGroup workGroup = new NioEventLoopGroup(); private ServerBootstrap bootStrap = new ServerBootstrap(); private ChannelFuture cf; private String zkAddress; private String serversPath; private String currentServerPath; private ServerData sd; private volatile boolean binded = false; private final ZkClient zc; private final RegistProvider registProvider; private static final Integer SESSION_TIME_OUT = 10000; private static final Integer CONNECT_TIME_OUT = 10000; public String getCurrentServerPath() { return currentServerPath; } public String getZkAddress() { return zkAddress; } public String getServersPath() { return serversPath; } public ServerData getSd() { return sd; } public void setSd(ServerData sd) { this.sd = sd; } public ServerImpl(String zkAddress, String serversPath, ServerData sd){ this.zkAddress = zkAddress; this.serversPath = serversPath; this.zc = new ZkClient(this.zkAddress,SESSION_TIME_OUT,CONNECT_TIME_OUT,new SerializableSerializer()); this.registProvider = new DefaultRegistProvider(); this.sd = sd; } //初始化服務端 private void initRunning() throws Exception { String mePath = serversPath.concat("/").concat(sd.getPort().toString()); //註冊到zookeeper registProvider.regist(new ZooKeeperRegistContext(mePath,zc,sd)); currentServerPath = mePath; } public void bind() { if (binded){ return; } System.out.println(sd.getPort()+":binding..."); try { initRunning(); } catch (Exception e) { e.printStackTrace(); return; } bootStrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ServerHandler(new DefaultBalanceUpdateProvider(currentServerPath,zc))); } }); try { cf = bootStrap.bind(sd.getPort()).sync(); binded = true; System.out.println(sd.getPort()+":binded..."); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
package com.test.cbd.zookeeper.loadbalance; import java.util.ArrayList; import java.util.List; /** * 用於測試,負責啟動Work Server */ public class ServerRunner { private static final int SERVER_QTY = 2; private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) { List<Thread> threadList = new ArrayList<Thread>(); for(int i=0; i<SERVER_QTY; i++){ final Integer count = i; Thread thread = new Thread(new Runnable() { public void run() { ServerData sd = new ServerData(); sd.setBalance(0); sd.setHost("127.0.0.1"); sd.setPort(6000+count); Server server = new ServerImpl(ZOOKEEPER_SERVER,SERVERS_PATH,sd); server.bind(); } }); threadList.add(thread); thread.start(); } for (int i=0; i<threadList.size(); i++){ try { threadList.get(i).join(); } catch (InterruptedException ignore) { // } } } }
package com.test.cbd.zookeeper.loadbalance; import org.I0Itec.zkclient.ZkClient; public class ZooKeeperRegistContext { private String path; private ZkClient zkClient; private Object data; public ZooKeeperRegistContext(String path, ZkClient zkClient, Object data) { super(); this.path = path; this.zkClient = zkClient; this.data = data; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public ZkClient getZkClient() { return zkClient; } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } }
客戶端程式碼:
package com.test.cbd.zookeeper.loadbalance.client; import java.util.List; public abstract class AbstractBalanceProvider<T> implements BalanceProvider<T> { protected abstract T balanceAlgorithm(List<T> items); protected abstract List<T> getBalanceItems(); public T getBalanceItem(){ return balanceAlgorithm(getBalanceItems()); } }
package com.test.cbd.zookeeper.loadbalance.client; public interface BalanceProvider<T> { public T getBalanceItem(); }
package com.test.cbd.zookeeper.loadbalance.client; public interface Client { // 連線伺服器 public void connect() throws Exception; // 斷開伺服器 public void disConnect() throws Exception; }
package com.test.cbd.zookeeper.loadbalance.client; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
package com.test.cbd.zookeeper.loadbalance.client; import com.test.cbd.zookeeper.loadbalance.ServerData; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; public class ClientImpl implements Client { private final BalanceProvider<ServerData> provider; private EventLoopGroup group = null; private Channel channel = null; private final Logger log = LoggerFactory.getLogger(getClass()); public ClientImpl(BalanceProvider<ServerData> provider) { this.provider = provider; } public BalanceProvider<ServerData> getProvider() { return provider; } public void connect(){ try{ ServerData serverData = provider.getBalanceItem(); // 獲取負載最小的伺服器資訊,並與之建立連線 System.out.println("connecting to "+serverData.getHost()+":"+serverData.getPort()+", it's balance:"+serverData.getBalance()); group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ClientHandler()); } }); ChannelFuture f = b.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly(); channel = f.channel(); System.out.println("started success!"); }catch(Exception e){ System.out.println("連線異常:"+e.getMessage()); } } public void disConnect(){ try{ if (channel!=null) channel.close().syncUninterruptibly(); group.shutdownGracefully(); group = null; log.debug("disconnected!"); }catch(Exception e){ log.error(e.getMessage()); } } }
package com.test.cbd.zookeeper.loadbalance.client; import com.test.cbd.zookeeper.loadbalance.ServerData; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; public class ClientRunner { private static final int CLIENT_QTY = 3; private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) { List<Thread> threadList = new ArrayList<Thread>(CLIENT_QTY); final List<Client> clientList = new ArrayList<Client>(); final BalanceProvider<ServerData> balanceProvider = new DefaultBalanceProvider(ZOOKEEPER_SERVER, SERVERS_PATH); try{ for(int i=0; i<CLIENT_QTY; i++){ Thread thread = new Thread(new Runnable() { public void run() { Client client = new ClientImpl(balanceProvider); clientList.add(client); try { client.connect(); } catch (Exception e) { e.printStackTrace(); } } }); threadList.add(thread); thread.start(); //延時 Thread.sleep(2000); } System.out.println("敲回車鍵退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); }catch(Exception e){ e.printStackTrace(); }finally{ //關閉客戶端 for (int i=0; i<clientList.size(); i++){ try { clientList.get(i); clientList.get(i).disConnect(); } catch (Exception ignore) { //ignore } } //關閉執行緒 for (int i=0; i<threadList.size(); i++){ threadList.get(i).interrupt(); try{ threadList.get(i).join(); }catch (InterruptedException e){ //ignore } } } } }
package com.test.cbd.zookeeper.loadbalance.client; import com.test.cbd.zookeeper.loadbalance.ServerData; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class DefaultBalanceProvider extends AbstractBalanceProvider<ServerData> { private final String zkServer; // zookeeper伺服器地址 private final String serversPath; // servers節點路徑 private final ZkClient zc; private static final Integer SESSION_TIME_OUT = 10000; private static final Integer CONNECT_TIME_OUT = 10000; public DefaultBalanceProvider(String zkServer, String serversPath) { this.serversPath = serversPath; this.zkServer = zkServer; this.zc = new ZkClient(this.zkServer, SESSION_TIME_OUT, CONNECT_TIME_OUT, new SerializableSerializer()); } @Override protected ServerData balanceAlgorithm(List<ServerData> items) { if (items.size()>0){ Collections.sort(items); // 根據負載由小到大排序 return items.get(0); // 返回負載最小的那個 }else{ return null; } } /** * 從zookeeper中拿到所有工作伺服器的基本資訊 */ @Override protected List<ServerData> getBalanceItems() { List<ServerData> sdList = new ArrayList<ServerData>(); List<String> children = zc.getChildren(this.serversPath); for(int i=0; i<children.size();i++){ ServerData sd = zc.readData(serversPath+"/"+children.get(i)); sdList.add(sd); } return sdList; } }