1. 程式人生 > >zookeeper【6】負載均衡

zookeeper【6】負載均衡

負載均衡是一種手段,用來把對某種資源的訪問分攤給不同的裝置,從而減輕單點的壓力。

架構圖

 

圖中左側為ZooKeeper叢集,右側上方為工作伺服器,下面為客戶端。每臺工作伺服器在啟動時都會去zookeeper的servers節點下注冊臨時節點,每臺客戶端在啟動時都會去servers節點下取得所有可用的工作伺服器列表,並通過一定的負載均衡演算法計算得出一臺工作伺服器,並與之建立網路連線。網路連線我們採用開源框架netty。

流程圖

負載均衡客戶端流程

 

 

服務端主體流程

 

 

類圖

Server端核心類

 


每個服務端對應一個Server介面,ServiceImpl是服務端的實現類。把服務端啟動時的註冊過程抽出為一個介面RegistProvider,並給予一個預設實現DefaultRegistProvider,它將用到一個上下文的類ZooKeeperRegistContext。我們的服務端是給予Netty的,它需要ServerHandler來處理與客戶端之間的連線,當有客戶端建立或失去連線時,我們都需要去修改當前伺服器的負載資訊,我們把修改負載資訊的過程也抽出為一個介面BalanceUpdateProvider,並且給予了一個預設實現DefaultBalanceUpdateProvider。ServerRunner是排程類,負責排程我們的Server。

Client端核心類

 


每個客戶端都需要實現一個Client介面,ClientImpl是實現,Client需要ClientHandler來處理與伺服器之前的通訊,同時它需要BalanceProvider為它提供負載均衡的演算法。BalanceProvider是介面,它有2個實現類,一個是抽象的實現AbstractBalanceProvider,一個是預設的實現DefaultBalanceProvider。ServerData是服務端和客戶端共用的一個類,服務端會把自己的基本資訊,包括負載資訊,打包成ServerData並寫入到zookeeper中,客戶端在計算負載的時候需要到zookeeper中拿到ServerData,並取得服務端的地址和負載資訊。ClientRunner是客戶端的排程類,負責啟動客戶端。

 

服務端程式碼:

public interface BalanceUpdateProvider {

    // 增加負載
    public boolean addBalance(Integer step);

    // 減少負載
    public boolean reduceBalance(Integer step);

}
package com.test.cbd.zookeeper.loadbalance;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.zookeeper.data.Stat;

public class DefaultBalanceUpdateProvider implements BalanceUpdateProvider {

    private String serverPath;
    private ZkClient zc;

    public DefaultBalanceUpdateProvider(String serverPath, ZkClient zkClient) {
        this.serverPath = serverPath;
        this.zc = zkClient;
    }

    public boolean addBalance(Integer step) {
        Stat stat = new Stat();
        ServerData sd;

        // 增加負載:讀取伺服器的資訊ServerData,增加負載,並寫回zookeeper
        while (true) {
            try {
                sd = zc.readData(this.serverPath, stat);
                sd.setBalance(sd.getBalance() + step);
                // 帶上版本,因為可能有其他客戶端連線到伺服器修改了負載
                zc.writeData(this.serverPath, sd, stat.getVersion());
                return true;
            } catch (ZkBadVersionException e) {
                // ignore
            } catch (Exception e) {
                return false;
            }
        }

    }

    public boolean reduceBalance(Integer step) {
        Stat stat = new Stat();
        ServerData sd;

        while (true) {
            try {
                sd = zc.readData(this.serverPath, stat);
                final Integer currBalance = sd.getBalance();
                sd.setBalance(currBalance>step?currBalance-step:0);
                zc.writeData(this.serverPath, sd, stat.getVersion());
                return true;
            } catch (ZkBadVersionException e) {
                // ignore
            } catch (Exception e) {
                return false;
            }
        }
    }

}
package com.test.cbd.zookeeper.loadbalance;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

public class DefaultRegistProvider implements RegistProvider {

    // 在zookeeper中建立臨時節點並寫入資訊
    public void regist(Object context) throws Exception {

        // Server在zookeeper中註冊自己,需要在zookeeper的目標節點上建立臨時節點並寫入自己
        // 將需要的以下3個資訊包裝成上下文傳入
        // 1:path
        // 2:zkClient
        // 3:serverData

        ZooKeeperRegistContext registContext = (ZooKeeperRegistContext) context;
        String path = registContext.getPath();
        ZkClient zc = registContext.getZkClient();

        try {
            zc.createEphemeral(path, registContext.getData());
        } catch (ZkNoNodeException e) {
            String parentDir = path.substring(0, path.lastIndexOf('/'));
            zc.createPersistent(parentDir, true);
            regist(registContext);
        }
    }

    public void unRegist(Object context) throws Exception {
        return;
    }

}
package com.test.cbd.zookeeper.loadbalance;

public interface RegistProvider {

    public void regist(Object context) throws Exception;

    public void unRegist(Object context) throws Exception;
}
package com.test.cbd.zookeeper.loadbalance;

public interface Server {

    public void bind();
}
package com.test.cbd.zookeeper.loadbalance;

import java.io.Serializable;

public class ServerData implements Serializable,Comparable<ServerData> {

    private static final long serialVersionUID = -8892569870391530906L;


    private Integer balance;
    private String host;
    private Integer port;


    public Integer getBalance() {
        return balance;
    }
    public void setBalance(Integer balance) {
        this.balance = balance;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public Integer getPort() {
        return port;
    }
    public void setPort(Integer port) {
        this.port = port;
    }


    @Override
    public String toString() {
        return "ServerData [balance=" + balance + ", host=" + host + ", port="
                + port + "]";
    }
    public int compareTo(ServerData o) {
        return this.getBalance().compareTo(o.getBalance());
    }

}
package com.test.cbd.zookeeper.loadbalance;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * 處理服務端與客戶端之間的通訊
 */
public class ServerHandler extends ChannelHandlerAdapter {

    private final BalanceUpdateProvider balanceUpdater;
    private static final Integer BALANCE_STEP = 1;


    public ServerHandler(BalanceUpdateProvider balanceUpdater){
        this.balanceUpdater = balanceUpdater;

    }

    public BalanceUpdateProvider getBalanceUpdater() {
        return balanceUpdater;
    }

    // 建立連線時增加負載
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("one client connect...");
        balanceUpdater.addBalance(BALANCE_STEP);
    }

    // 斷開連線時減少負載
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        balanceUpdater.reduceBalance(BALANCE_STEP);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }


}
package com.test.cbd.zookeeper.loadbalance;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;


public class ServerImpl implements Server {

    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    private EventLoopGroup workGroup = new NioEventLoopGroup();
    private ServerBootstrap bootStrap = new ServerBootstrap();
    private ChannelFuture cf;
    private String zkAddress;
    private String serversPath;
    private String currentServerPath;
    private ServerData sd;

    private volatile boolean binded = false;

    private final ZkClient zc;
    private final RegistProvider registProvider;

    private static final Integer SESSION_TIME_OUT = 10000;
    private static final Integer CONNECT_TIME_OUT = 10000;



    public String getCurrentServerPath() {
        return currentServerPath;
    }

    public String getZkAddress() {
        return zkAddress;
    }

    public String getServersPath() {
        return serversPath;
    }

    public ServerData getSd() {
        return sd;
    }

    public void setSd(ServerData sd) {
        this.sd = sd;
    }

    public ServerImpl(String zkAddress, String serversPath, ServerData sd){
        this.zkAddress = zkAddress;
        this.serversPath = serversPath;
        this.zc = new ZkClient(this.zkAddress,SESSION_TIME_OUT,CONNECT_TIME_OUT,new SerializableSerializer());
        this.registProvider = new DefaultRegistProvider();
        this.sd = sd;
    }

    //初始化服務端
    private void initRunning() throws Exception {
        String mePath = serversPath.concat("/").concat(sd.getPort().toString());
        //註冊到zookeeper
        registProvider.regist(new ZooKeeperRegistContext(mePath,zc,sd));
        currentServerPath = mePath;
    }

    public void bind() {

        if (binded){
            return;
        }

        System.out.println(sd.getPort()+":binding...");

        try {
            initRunning();
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

        bootStrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new ServerHandler(new DefaultBalanceUpdateProvider(currentServerPath,zc)));
                    }
                });

        try {
            cf =  bootStrap.bind(sd.getPort()).sync();
            binded = true;
            System.out.println(sd.getPort()+":binded...");
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

    }

}
package com.test.cbd.zookeeper.loadbalance;

import java.util.ArrayList;
import java.util.List;

/**
 * 用於測試,負責啟動Work Server
 */
public class ServerRunner {

    private static final int  SERVER_QTY = 2;
    private static final String  ZOOKEEPER_SERVER = "192.168.1.105:2181";
    private static final String  SERVERS_PATH = "/servers";

    public static void main(String[] args) {

        List<Thread> threadList = new ArrayList<Thread>();

        for(int i=0; i<SERVER_QTY; i++){

            final Integer count = i;
            Thread thread = new Thread(new Runnable() {

                public void run() {
                    ServerData sd = new ServerData();
                    sd.setBalance(0);
                    sd.setHost("127.0.0.1");
                    sd.setPort(6000+count);
                    Server server = new ServerImpl(ZOOKEEPER_SERVER,SERVERS_PATH,sd);
                    server.bind();
                }
            });
            threadList.add(thread);
            thread.start();
        }

        for (int i=0; i<threadList.size(); i++){
            try {
                threadList.get(i).join();
            } catch (InterruptedException ignore) {
                //
            }

        }


    }

}
package com.test.cbd.zookeeper.loadbalance;

import org.I0Itec.zkclient.ZkClient;

public class ZooKeeperRegistContext {

    private String path;
    private ZkClient zkClient;
    private Object data;

    public ZooKeeperRegistContext(String path, ZkClient zkClient, Object data) {
        super();
        this.path = path;
        this.zkClient = zkClient;
        this.data = data;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public ZkClient getZkClient() {
        return zkClient;
    }
    public void setZkClient(ZkClient zkClient) {
        this.zkClient = zkClient;
    }
    public Object getData() {
        return data;
    }
    public void setData(Object data) {
        this.data = data;
    }

}

客戶端程式碼:

package com.test.cbd.zookeeper.loadbalance.client;

import java.util.List;

public abstract class AbstractBalanceProvider<T> implements BalanceProvider<T> {

    protected abstract T balanceAlgorithm(List<T> items);
    protected abstract List<T> getBalanceItems();

    public T getBalanceItem(){
        return balanceAlgorithm(getBalanceItems());
    }

}
package com.test.cbd.zookeeper.loadbalance.client;

public interface BalanceProvider<T> {

    public T getBalanceItem();

}
package com.test.cbd.zookeeper.loadbalance.client;

public interface Client {

    // 連線伺服器
    public void connect() throws Exception;
    // 斷開伺服器
    public void disConnect() throws Exception;

}
package com.test.cbd.zookeeper.loadbalance.client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
package com.test.cbd.zookeeper.loadbalance.client;

import com.test.cbd.zookeeper.loadbalance.ServerData;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;

public class ClientImpl implements Client {

    private final BalanceProvider<ServerData> provider;
    private EventLoopGroup group = null;
    private Channel channel = null;

    private final Logger log = LoggerFactory.getLogger(getClass());

    public ClientImpl(BalanceProvider<ServerData> provider) {
        this.provider = provider;
    }

    public BalanceProvider<ServerData> getProvider() {
        return provider;
    }

    public void connect(){

        try{

            ServerData serverData = provider.getBalanceItem(); // 獲取負載最小的伺服器資訊,並與之建立連線

            System.out.println("connecting to "+serverData.getHost()+":"+serverData.getPort()+", it's balance:"+serverData.getBalance());

            group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new ClientHandler());
                        }
                    });
            ChannelFuture f = b.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly();
            channel = f.channel();

            System.out.println("started success!");

        }catch(Exception e){

            System.out.println("連線異常:"+e.getMessage());

        }

    }

    public void disConnect(){

        try{

            if (channel!=null)
                channel.close().syncUninterruptibly();

            group.shutdownGracefully();
            group = null;

            log.debug("disconnected!");

        }catch(Exception e){

            log.error(e.getMessage());

        }
    }

}
package com.test.cbd.zookeeper.loadbalance.client;

import com.test.cbd.zookeeper.loadbalance.ServerData;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

public class ClientRunner {

    private static final int  CLIENT_QTY = 3;
    private static final String  ZOOKEEPER_SERVER = "192.168.1.105:2181";
    private static final String  SERVERS_PATH = "/servers";

    public static void main(String[] args) {

        List<Thread> threadList = new ArrayList<Thread>(CLIENT_QTY);
        final List<Client> clientList = new ArrayList<Client>();
        final BalanceProvider<ServerData> balanceProvider = new DefaultBalanceProvider(ZOOKEEPER_SERVER, SERVERS_PATH);

        try{

            for(int i=0; i<CLIENT_QTY; i++){

                Thread thread = new Thread(new Runnable() {

                    public void run() {
                        Client client = new ClientImpl(balanceProvider);
                        clientList.add(client);
                        try {
                            client.connect();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                threadList.add(thread);
                thread.start();
                //延時
                Thread.sleep(2000);

            }

            System.out.println("敲回車鍵退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();


        }catch(Exception e){

            e.printStackTrace();

        }finally{
            //關閉客戶端
            for (int i=0; i<clientList.size(); i++){
                try {
                    clientList.get(i);
                    clientList.get(i).disConnect();
                } catch (Exception ignore) {
                    //ignore
                }
            }
            //關閉執行緒
            for (int i=0; i<threadList.size(); i++){
                threadList.get(i).interrupt();
                try{
                    threadList.get(i).join();
                }catch (InterruptedException e){
                    //ignore
                }
            }
        }
    }
}
package com.test.cbd.zookeeper.loadbalance.client;

import com.test.cbd.zookeeper.loadbalance.ServerData;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class DefaultBalanceProvider extends AbstractBalanceProvider<ServerData> {

    private final String zkServer; // zookeeper伺服器地址
    private final String serversPath; // servers節點路徑
    private final ZkClient zc;

    private static final Integer SESSION_TIME_OUT = 10000;
    private static final Integer CONNECT_TIME_OUT = 10000;

    public DefaultBalanceProvider(String zkServer, String serversPath) {
        this.serversPath = serversPath;
        this.zkServer = zkServer;

        this.zc = new ZkClient(this.zkServer, SESSION_TIME_OUT, CONNECT_TIME_OUT,
                new SerializableSerializer());

    }

    @Override
    protected ServerData balanceAlgorithm(List<ServerData> items) {
        if (items.size()>0){
            Collections.sort(items); // 根據負載由小到大排序
            return items.get(0); // 返回負載最小的那個
        }else{
            return null;
        }
    }

    /**
     * 從zookeeper中拿到所有工作伺服器的基本資訊
     */
    @Override
    protected List<ServerData> getBalanceItems() {

        List<ServerData> sdList = new ArrayList<ServerData>();
        List<String> children = zc.getChildren(this.serversPath);
        for(int i=0; i<children.size();i++){
            ServerData sd = zc.readData(serversPath+"/"+children.get(i));
            sdList.add(sd);
        }
        return sdList;

    }

}