1. 程式人生 > >解決socket負載均衡叢集方案和程式碼實現

解決socket負載均衡叢集方案和程式碼實現

有一段時間,在考慮下socket 之間叢集 可以在Nginx 下可以 但是不同伺服器之間怎麼通訊呢 後來自己也想可不可以用什麼東西或者中介軟體來通訊 ,後來在百度之下 發現果然就是按照我所想的 ,在網上看了一個方案,架構如下:


說一下這個方案他們之間的作用

redis : 存取使用者和伺服器的關係,如果A想發一條資料,是發給B ,這時A發起一個mq訂閱模式推送的資料 ,然後B收到資料處理好 ,在推送給使用者就ok 

mq : 是了橋接A和B 的之間的通訊

這樣A和B之間通訊就沒有問題了

以下是搭建過程

首先配置好nginx ,可以百度下載和安裝 ,我的路徑是如下 開啟配置nginx.conf   vim nginx.conf 配置如下



map $http_upgrade $connection_upgrade {

default upgrade;

'' close;

}

upstream ws_name {

server 127.0.0.1:12345 weight=1; ## weight 權重越大越大獲取連結機會就越大

server 127.0.0.1:12346 weight=1;

}

server {

listen 9999;

server_namelocalhost;

location / {

proxy_pass http://ws_name/;

proxy_http_version 1.1;

proxy_set_header Upgrade $http_upgrade;

proxy_set_header Connection "Upgrade";

### 以下配置是為了解決在nginx 下 socket 短時間的斷開 ,即使你配置長連結也沒用 必須有如下的配置

proxy_connect_timeout 4s;

proxy_read_timeout 600s; #這個配置連線保持多長時間 這配置十分鐘 ,然後可以自己心跳來保證長連結

proxy_send_timeout 12s;

}

error_page 500 502 503 504/50x.html;

location = /50x.html {

root html;

}

以上是nginx的配置 然後啟動就ok 啟動 命令 :./nginx 

檢視是否啟動 命令  ps -ef |grep nginx

 然後 mq 我的mq是 activemq  這個可以百度看看怎麼安裝 ,因為這個安裝比較簡單所以就不說太多了 

接下來是程式碼的實現了 :

package com.yw.socket;

import com.yw.mq.Producer;
import com.yw.redis.JedisUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import javax.jms.JMSException;
import java.util.Date;
import java.util.Iterator;


/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 21:29
 **/
public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private String URLSTR="ws://%s:%s/%s";

    private String ws="";

    private String port="";



    private WebSocketServerHandshaker handshaker;

    public SocketServerHandler(String url,String port,String name){
        ws = String.format(URLSTR, url,port,name);
        this.port=port;

    }

    public SocketServerHandler(String port,String name){
        this("127.0.0.1",port,name);
    }

    public  SocketServerHandler(String port){
        this("127.0.0.1",port,"websocket");
    }

    public  SocketServerHandler(){

    };
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {


        if (msg instanceof FullHttpRequest) {
            System.out.println("handleHttpRequest  FullHttpRequest");
            ///處理第一次連線過來的請求
            handleHttpRequest(channelHandlerContext,(FullHttpRequest)msg);
        }  else if (msg instanceof WebSocketFrame) {

            handlerWebSocketFrame(channelHandlerContext,(WebSocketFrame)msg);
        }

    }



    /**
     * 新增新的連線
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
           System.out.println("有新的連線進來了額  " + ctx.channel().id() + " 連線時間是 " + new Date().toLocaleString());
          //新的連線,儲存對應服務物件和和所在服務,這裡用 channel.id 當做key只是演示 真實不可,可以根據的自己業務需求去分析
           JedisUtil JedisUtil=new JedisUtil();
           JedisUtil.setKey(ctx.channel().id().toString(),this.port);
           JedisUtil.closeResource();

           MapChannerlGlobal.GROUP.add(ctx.channel());
    }

    /**
     * 刪除關閉連線
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        MapChannerlGlobal.GROUP.add(ctx.channel());
        System.out.println("刪除該連結" + ctx.channel().id() +  "斷開連線時間 " + new Date().toLocaleString());
        //刪除對應key
        JedisUtil JedisUtil=new JedisUtil();
        JedisUtil.delKeyString(ctx.channel().id().toString());
        JedisUtil.closeResource();

    }


    /**
     *  處理髮過來的資訊
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){

        System.out.println("handlerWebSocketFrame-->> "+frame+frame.getClass().getSimpleName());

        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame);
            System.out.println("關閉連線---CloseWebSocketFrame");
        }
        //不是二進位制的資料
        if (!(frame instanceof TextWebSocketFrame)){
            System.out.println("暫時只有二進位制資料處理");
            return ;
        }

        String msg = ((TextWebSocketFrame) frame).text();
        if(msg.indexOf(":")>=0){
            String[] split = msg.split(":");
            //獲取該使用者所在的伺服器
            JedisUtil JedisUtil=new JedisUtil();
            String cid = JedisUtil.getkeyStr(split[0]);
            JedisUtil.closeResource();//關閉redis
            //如果埠號不相同的話 說明該伺服器不在同一個  所以借用中介軟體推送
            if(!cid.equals(this.port)){
                try {
                    new Producer().sendTocp(msg);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else{
                ///如果相同則直接推送
                ChannelGroup group = MapChannerlGlobal.GROUP;
                Iterator<Channel> iterator = group.iterator();
                while(iterator.hasNext()){
                    if(cid.equals(iterator.next().id().toString())){
                        //推送資料
                        TextWebSocketFrame  tws = new TextWebSocketFrame(split[1]);
                        iterator.next().writeAndFlush(tws);
                    }
                }


            }
            return  ;
        }

        //其他訊息

        System.out.println("客戶端發過來的的資料: "+msg);
        long time = new Date().getTime();
        TextWebSocketFrame tws =new TextWebSocketFrame(" 這是伺服器回饋的資料 時間:"+ time +" 通道的 id " +ctx.channel().id() );
        ctx.channel().writeAndFlush(tws);

    }



    /**
     * 傳送響應回饋請求者
     * @param ctx
     * @param req
     * @param resp
     */
    private void sendHttpResponse(ChannelHandlerContext ctx ,FullHttpRequest req,DefaultFullHttpResponse resp){

        if (resp.getStatus().code()!=200){

            ByteBuf buffer = Unpooled.copiedBuffer(resp.getStatus().toString(), CharsetUtil.UTF_8);
            resp.content().writeBytes(buffer);
            buffer.release();
        }

        //響應請求
        ChannelFuture f=ctx.channel().writeAndFlush(resp);

        if ( resp .getStatus().code()!=200){
            f.addListener(ChannelFutureListener.CLOSE);///發起關閉
        }
    }


    /**
     * 處理髮過來的請求
     * @param ctx
     * @param req
     */
    private  void  handleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){

        if (req.getDecoderResult().isSuccess() && (!"websocket".equals(req.headers().get("Upgrade")))) {
            ///建立一個響應者
            DefaultFullHttpResponse resp=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST,false);

           sendHttpResponse(ctx,req,resp);
        }

        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(ws,null,false);

        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory
                    .sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }

    }


    /**
     * 讀取完通道流 進行重新整理
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        ctx.flush();
    }









}


package com.yw.socket;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 21:28
 **/
public class MapChannerlGlobal {

    public static ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


}
package com.yw.socket;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.util.Iterator;

/**
 * @program: mqYw
 * @description: 接受訊息佇列的資料
 * @author: yw
 * @create: 2018-07-06 21:58
 **/
public class ConsumerSocket implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        ChannelGroup group = MapChannerlGlobal.GROUP;
        TextWebSocketFrame tws = null;
        Channel next = null;
        Iterator<Channel> iterator = group.iterator();
        try {
            // 這是一個標誌  如果是 key:msg  ,key 是Channel的id  msg是訊息 演示用 真實中不能這楊 可以根據自己的需求去定這個key
            if( textMessage.getText().toString().indexOf(":")>=0){
                String[] strings = textMessage.getText().toString().split(":");
                while (iterator.hasNext()) {
                    next = iterator.next();
                    ///這裡用Channel的id當鍵值是不合理 只是用來演示而已
                    if(next.id().toString().equals(strings[0])){
                        try {
                            //推送資料
                            tws = new TextWebSocketFrame(textMessage.getText());
                            next.writeAndFlush(tws);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }



    }


}

package com.yw.socket;

import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;

import io.netty.channel.socket.SocketChannel;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 21:37
 **/
public class ChannelInitializerHandler extends ChannelInitializer<SocketChannel> {

    public ChannelInitializerHandler() {

    }
    //埠號
    private  String  port="";
    public ChannelInitializerHandler(String port) {
        this.port=port;
    }


    @Override
    protected void initChannel(SocketChannel cl) throws Exception {
             cl.pipeline().addLast("http-codec",new HttpServerCodec())
                ///接受資料長度
                .addLast("aggregator",new HttpObjectAggregator(1024*1024))
                // 設定當有新的模組時候要恢復傳輸
                .addLast("http-chunked",new ChunkedWriteHandler())

                .addLast(new StringDecoder(CharsetUtil.UTF_8))
                .addLast(new StringEncoder(CharsetUtil.UTF_8))
                //新增自己定義handler
                .addLast("handler",new SocketServerHandler(port,"ts"));
    }
}

package com.yw.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 22:28
 **/
public class JedisTools {


    private static JedisPool jedisPool = null;
   ///連線ip
    private  static final String ADDRIP="127.0.0.1";
    //埠號
    private static final  Integer PORT = 6379;
    ///密碼
    private  static  final  String PWD="123456";


    static {
        if(jedisPool==null){
            JedisPoolConfig config=new JedisPoolConfig();
            //設定最大連線數 預設是8   , -1表示 無限
            config.setMaxTotal(10);
            //最多有多少個狀態為idle(空閒的)的jedis例項  預設是 8
            config.setMaxIdle(10);
            ///等待可用連線的最大時間,單位毫秒,預設值為-1,表示永不超時
            config.setMaxWaitMillis(300);
            ///在borrow一個jedis例項時,是否提前進行validate操作;如果為true,則得到的jedis例項均是可用的;
            config.setTestOnBorrow(true);
            jedisPool=new JedisPool(config,ADDRIP,PORT,3000);

        }

    }


    public synchronized static Jedis getJedis(){

        if(jedisPool!=null){
            Jedis j= jedisPool.getResource();
            return j;
        }

        return null;

    }




}

package com.yw.redis;

import redis.clients.jedis.Jedis;

import java.io.*;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-07 12:46
 **/
public class JedisUtil {


    private Jedis jedis;

    public JedisUtil(){
        this.jedis = JedisTools.getJedis();
    }


    public void setKey(String key,Object obj){

        jedis.set(key.getBytes(),objToByte(obj));

    }


    public Object getKeyObj(String key){
        return byteToObj(jedis.get(key.getBytes()));
    }



    public void  setKey(String key,String value){
        jedis.set(key,value);
    }


    public String getkeyStr(String key){
        return jedis.get(key);
    }


    public void closeResource(){

        if(this.jedis!=null){
        ///    this.jedis.flushAll();
            this.jedis.close();
        }

    }


    public void delKeyString(String key){
        this.jedis.del(key);
    }


    public void delKeyBytes(String key){
        this.jedis.del(key.getBytes());

    }
    /**
     * 序列化轉為obj
     * @param
     * @return
     */
    public Object byteToObj(byte[] bt){
        Object obj=null;
        if(bt==null) return null;
        ByteArrayInputStream bis=null;
        ObjectInputStream ois=null;
        try {
            bis=new ByteArrayInputStream(bt);
            ois=new ObjectInputStream(bis);

            obj=ois.readObject();

        } catch (Exception e) {
         e.printStackTrace();
        }finally{
            try {
                if(bis!=null){
                    bis.close();
                }
                if(ois!=null){
                    ois.close();
                }
            } catch (IOException e1) {
                System.out.println(e1);
            }
        }
        return obj;
    }


    /**
     * obj 序列化
     * @param obj
     * @return
     */
    public byte[] objToByte(Object obj){
        if(obj==null) return null;
        byte [] bt=null;
        ByteArrayOutputStream bos=null;
        ObjectOutputStream oos=null;
        try {
            bos=new ByteArrayOutputStream();
            oos=new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            bt=bos.toByteArray();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                if(bos!=null){
                    bos.close();
                }
                if(oos!=null){
                    oos.close();
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bt;
    }


    public static void main(String[] args) {
        JedisUtil JedisUtil=new JedisUtil();

        JedisUtil.setKey("test","123");
        System.out.println(JedisUtil.getkeyStr("test"));
        JedisUtil.closeResource();

    }
}

public class Producer {
    private static final   String TOPIC ="test-topic";

    //訂閱模式
    public void sendTocp(String msg) throws JMSException {
//TOPIC
        Connection connection = ConnectionUtil.getConnection(URL);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(TOPIC);
        MessageProducer producer = session.createProducer(topic);

        TextMessage textMessage=null;

        textMessage = session.createTextMessage(msg);
        producer.send(textMessage);

        producer.close();
        session.close();
        connection.close();
    }
}
package test.mq;

import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import com.yw.socket.SocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import javax.jms.*;

/**
 * 啟動 服務A 
 *
 */
public class App

{

    private static final   String TOPIC ="test-topic";
    private static final   String URL ="tcp://127.0.0.1:61616";
    private static final   String QUEUENAME ="test-queue";

    public void msgTocp() throws JMSException {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection(URL);
        ///建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //訂閱模式
        Topic topic = session.createTopic(TOPIC);
        /// 建立消費者
        MessageConsumer consumer = session.createConsumer(topic);
        //加入監聽者
        consumer.setMessageListener(new ConsumerSocket());

    }

    public static void main( String[] args ) throws InterruptedException, JMSException {
        //啟動消費者
        new App().msgTocp();

        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();

        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup,workGroup);
        sb.channel(NioServerSocketChannel.class);
        sb.option(ChannelOption.SO_BACKLOG,128)
                // 有資料立即傳送
                .option(ChannelOption.TCP_NODELAY,true)
                // 長連線 建議不用 用心跳來保持長連結 ,免得浪費資源  經常測試 在nginx負載均衡下是這個長連結不起作用的
                .childOption(ChannelOption.SO_KEEPALIVE,true);
        sb.childHandler(new ChannelInitializerHandler("12345"));
        System.out.println("服務端開啟等待客戶端連線 ... ...");
        sb.bind(12345).sync().channel().closeFuture().sync();
    }
}

package test.mq;

import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import javax.jms.*;

/**
 * 啟動服務 B
 *
 */
public class App02

{


    public static void main( String[] args ) throws InterruptedException, JMSException {
        //啟動消費者
        new App().msgTocp();

        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();

        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup,workGroup);
        sb.channel(NioServerSocketChannel.class);
        sb.option(ChannelOption.SO_BACKLOG,128)
                // 有資料立即傳送
                .option(ChannelOption.TCP_NODELAY,true)
                // 長連線 建議不用 用心跳來保持長連結 ,免得浪費資源  經常測試 在nginx負載均衡下是這個長連結不起作用的
                .childOption(ChannelOption.SO_KEEPALIVE,true);
        sb.childHandler(new ChannelInitializerHandler("12346"));
        System.out.println("服務端開啟等待客戶端連線 ... ...");
        sb.bind(12346).sync().channel().closeFuture().sync();
    }
}

以上是全部程式碼了 。接下是效果圖



這樣就大功告成了 ,

但是這樣架構是很危險的 提單一了 redis 和mq 如果我之間有寫過mq 的叢集加高可用了 請看這裡,redis 有空會研究一下叢集高可用才行 ,以上如果有雷同的地方,都是學習中 ,架構是來自百度的 不過自己之前也想過類似 最終是別人的部落格證實了我的想法了