解決socket負載均衡叢集方案和程式碼實現
有一段時間,在考慮下socket 之間叢集 可以在Nginx 下可以 但是不同伺服器之間怎麼通訊呢 後來自己也想可不可以用什麼東西或者中介軟體來通訊 ,後來在百度之下 發現果然就是按照我所想的 ,在網上看了一個方案,架構如下:
說一下這個方案他們之間的作用
redis : 存取使用者和伺服器的關係,如果A想發一條資料,是發給B ,這時A發起一個mq訂閱模式推送的資料 ,然後B收到資料處理好 ,在推送給使用者就ok
mq : 是了橋接A和B 的之間的通訊
這樣A和B之間通訊就沒有問題了
以下是搭建過程
首先配置好nginx ,可以百度下載和安裝 ,我的路徑是如下 開啟配置nginx.conf vim nginx.conf 配置如下
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream ws_name {
server 127.0.0.1:12345 weight=1; ## weight 權重越大越大獲取連結機會就越大
server 127.0.0.1:12346 weight=1;
}
server {
listen 9999;
server_namelocalhost;
location / {
proxy_pass http://ws_name/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
### 以下配置是為了解決在nginx 下 socket 短時間的斷開 ,即使你配置長連結也沒用 必須有如下的配置
proxy_connect_timeout 4s;
proxy_read_timeout 600s; #這個配置連線保持多長時間 這配置十分鐘 ,然後可以自己心跳來保證長連結
proxy_send_timeout 12s;
}
error_page 500 502 503 504/50x.html;
location = /50x.html {
root html;
}
以上是nginx的配置 然後啟動就ok 啟動 命令 :./nginx
檢視是否啟動 命令 ps -ef |grep nginx
然後 mq 我的mq是 activemq 這個可以百度看看怎麼安裝 ,因為這個安裝比較簡單所以就不說太多了
接下來是程式碼的實現了 :
package com.yw.socket;
import com.yw.mq.Producer;
import com.yw.redis.JedisUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import javax.jms.JMSException;
import java.util.Date;
import java.util.Iterator;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 21:29
**/
public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
private String URLSTR="ws://%s:%s/%s";
private String ws="";
private String port="";
private WebSocketServerHandshaker handshaker;
public SocketServerHandler(String url,String port,String name){
ws = String.format(URLSTR, url,port,name);
this.port=port;
}
public SocketServerHandler(String port,String name){
this("127.0.0.1",port,name);
}
public SocketServerHandler(String port){
this("127.0.0.1",port,"websocket");
}
public SocketServerHandler(){
};
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
System.out.println("handleHttpRequest FullHttpRequest");
///處理第一次連線過來的請求
handleHttpRequest(channelHandlerContext,(FullHttpRequest)msg);
} else if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(channelHandlerContext,(WebSocketFrame)msg);
}
}
/**
* 新增新的連線
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("有新的連線進來了額 " + ctx.channel().id() + " 連線時間是 " + new Date().toLocaleString());
//新的連線,儲存對應服務物件和和所在服務,這裡用 channel.id 當做key只是演示 真實不可,可以根據的自己業務需求去分析
JedisUtil JedisUtil=new JedisUtil();
JedisUtil.setKey(ctx.channel().id().toString(),this.port);
JedisUtil.closeResource();
MapChannerlGlobal.GROUP.add(ctx.channel());
}
/**
* 刪除關閉連線
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
MapChannerlGlobal.GROUP.add(ctx.channel());
System.out.println("刪除該連結" + ctx.channel().id() + "斷開連線時間 " + new Date().toLocaleString());
//刪除對應key
JedisUtil JedisUtil=new JedisUtil();
JedisUtil.delKeyString(ctx.channel().id().toString());
JedisUtil.closeResource();
}
/**
* 處理髮過來的資訊
* @param ctx
* @param frame
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){
System.out.println("handlerWebSocketFrame-->> "+frame+frame.getClass().getSimpleName());
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame);
System.out.println("關閉連線---CloseWebSocketFrame");
}
//不是二進位制的資料
if (!(frame instanceof TextWebSocketFrame)){
System.out.println("暫時只有二進位制資料處理");
return ;
}
String msg = ((TextWebSocketFrame) frame).text();
if(msg.indexOf(":")>=0){
String[] split = msg.split(":");
//獲取該使用者所在的伺服器
JedisUtil JedisUtil=new JedisUtil();
String cid = JedisUtil.getkeyStr(split[0]);
JedisUtil.closeResource();//關閉redis
//如果埠號不相同的話 說明該伺服器不在同一個 所以借用中介軟體推送
if(!cid.equals(this.port)){
try {
new Producer().sendTocp(msg);
} catch (JMSException e) {
e.printStackTrace();
}
}else{
///如果相同則直接推送
ChannelGroup group = MapChannerlGlobal.GROUP;
Iterator<Channel> iterator = group.iterator();
while(iterator.hasNext()){
if(cid.equals(iterator.next().id().toString())){
//推送資料
TextWebSocketFrame tws = new TextWebSocketFrame(split[1]);
iterator.next().writeAndFlush(tws);
}
}
}
return ;
}
//其他訊息
System.out.println("客戶端發過來的的資料: "+msg);
long time = new Date().getTime();
TextWebSocketFrame tws =new TextWebSocketFrame(" 這是伺服器回饋的資料 時間:"+ time +" 通道的 id " +ctx.channel().id() );
ctx.channel().writeAndFlush(tws);
}
/**
* 傳送響應回饋請求者
* @param ctx
* @param req
* @param resp
*/
private void sendHttpResponse(ChannelHandlerContext ctx ,FullHttpRequest req,DefaultFullHttpResponse resp){
if (resp.getStatus().code()!=200){
ByteBuf buffer = Unpooled.copiedBuffer(resp.getStatus().toString(), CharsetUtil.UTF_8);
resp.content().writeBytes(buffer);
buffer.release();
}
//響應請求
ChannelFuture f=ctx.channel().writeAndFlush(resp);
if ( resp .getStatus().code()!=200){
f.addListener(ChannelFutureListener.CLOSE);///發起關閉
}
}
/**
* 處理髮過來的請求
* @param ctx
* @param req
*/
private void handleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){
if (req.getDecoderResult().isSuccess() && (!"websocket".equals(req.headers().get("Upgrade")))) {
///建立一個響應者
DefaultFullHttpResponse resp=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST,false);
sendHttpResponse(ctx,req,resp);
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(ws,null,false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 讀取完通道流 進行重新整理
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
ctx.flush();
}
}
package com.yw.socket;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 21:28
**/
public class MapChannerlGlobal {
public static ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
package com.yw.socket;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.util.Iterator;
/**
* @program: mqYw
* @description: 接受訊息佇列的資料
* @author: yw
* @create: 2018-07-06 21:58
**/
public class ConsumerSocket implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
ChannelGroup group = MapChannerlGlobal.GROUP;
TextWebSocketFrame tws = null;
Channel next = null;
Iterator<Channel> iterator = group.iterator();
try {
// 這是一個標誌 如果是 key:msg ,key 是Channel的id msg是訊息 演示用 真實中不能這楊 可以根據自己的需求去定這個key
if( textMessage.getText().toString().indexOf(":")>=0){
String[] strings = textMessage.getText().toString().split(":");
while (iterator.hasNext()) {
next = iterator.next();
///這裡用Channel的id當鍵值是不合理 只是用來演示而已
if(next.id().toString().equals(strings[0])){
try {
//推送資料
tws = new TextWebSocketFrame(textMessage.getText());
next.writeAndFlush(tws);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.yw.socket;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.channel.socket.SocketChannel;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 21:37
**/
public class ChannelInitializerHandler extends ChannelInitializer<SocketChannel> {
public ChannelInitializerHandler() {
}
//埠號
private String port="";
public ChannelInitializerHandler(String port) {
this.port=port;
}
@Override
protected void initChannel(SocketChannel cl) throws Exception {
cl.pipeline().addLast("http-codec",new HttpServerCodec())
///接受資料長度
.addLast("aggregator",new HttpObjectAggregator(1024*1024))
// 設定當有新的模組時候要恢復傳輸
.addLast("http-chunked",new ChunkedWriteHandler())
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
//新增自己定義handler
.addLast("handler",new SocketServerHandler(port,"ts"));
}
}
package com.yw.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 22:28
**/
public class JedisTools {
private static JedisPool jedisPool = null;
///連線ip
private static final String ADDRIP="127.0.0.1";
//埠號
private static final Integer PORT = 6379;
///密碼
private static final String PWD="123456";
static {
if(jedisPool==null){
JedisPoolConfig config=new JedisPoolConfig();
//設定最大連線數 預設是8 , -1表示 無限
config.setMaxTotal(10);
//最多有多少個狀態為idle(空閒的)的jedis例項 預設是 8
config.setMaxIdle(10);
///等待可用連線的最大時間,單位毫秒,預設值為-1,表示永不超時
config.setMaxWaitMillis(300);
///在borrow一個jedis例項時,是否提前進行validate操作;如果為true,則得到的jedis例項均是可用的;
config.setTestOnBorrow(true);
jedisPool=new JedisPool(config,ADDRIP,PORT,3000);
}
}
public synchronized static Jedis getJedis(){
if(jedisPool!=null){
Jedis j= jedisPool.getResource();
return j;
}
return null;
}
}
package com.yw.redis;
import redis.clients.jedis.Jedis;
import java.io.*;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-07 12:46
**/
public class JedisUtil {
private Jedis jedis;
public JedisUtil(){
this.jedis = JedisTools.getJedis();
}
public void setKey(String key,Object obj){
jedis.set(key.getBytes(),objToByte(obj));
}
public Object getKeyObj(String key){
return byteToObj(jedis.get(key.getBytes()));
}
public void setKey(String key,String value){
jedis.set(key,value);
}
public String getkeyStr(String key){
return jedis.get(key);
}
public void closeResource(){
if(this.jedis!=null){
/// this.jedis.flushAll();
this.jedis.close();
}
}
public void delKeyString(String key){
this.jedis.del(key);
}
public void delKeyBytes(String key){
this.jedis.del(key.getBytes());
}
/**
* 序列化轉為obj
* @param
* @return
*/
public Object byteToObj(byte[] bt){
Object obj=null;
if(bt==null) return null;
ByteArrayInputStream bis=null;
ObjectInputStream ois=null;
try {
bis=new ByteArrayInputStream(bt);
ois=new ObjectInputStream(bis);
obj=ois.readObject();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(bis!=null){
bis.close();
}
if(ois!=null){
ois.close();
}
} catch (IOException e1) {
System.out.println(e1);
}
}
return obj;
}
/**
* obj 序列化
* @param obj
* @return
*/
public byte[] objToByte(Object obj){
if(obj==null) return null;
byte [] bt=null;
ByteArrayOutputStream bos=null;
ObjectOutputStream oos=null;
try {
bos=new ByteArrayOutputStream();
oos=new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
bt=bos.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(bos!=null){
bos.close();
}
if(oos!=null){
oos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return bt;
}
public static void main(String[] args) {
JedisUtil JedisUtil=new JedisUtil();
JedisUtil.setKey("test","123");
System.out.println(JedisUtil.getkeyStr("test"));
JedisUtil.closeResource();
}
}
public class Producer {
private static final String TOPIC ="test-topic";
//訂閱模式
public void sendTocp(String msg) throws JMSException {
//TOPIC
Connection connection = ConnectionUtil.getConnection(URL);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC);
MessageProducer producer = session.createProducer(topic);
TextMessage textMessage=null;
textMessage = session.createTextMessage(msg);
producer.send(textMessage);
producer.close();
session.close();
connection.close();
}
}
package test.mq;
import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import com.yw.socket.SocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import javax.jms.*;
/**
* 啟動 服務A
*
*/
public class App
{
private static final String TOPIC ="test-topic";
private static final String URL ="tcp://127.0.0.1:61616";
private static final String QUEUENAME ="test-queue";
public void msgTocp() throws JMSException {
//獲取連線
Connection connection = ConnectionUtil.getConnection(URL);
///建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//訂閱模式
Topic topic = session.createTopic(TOPIC);
/// 建立消費者
MessageConsumer consumer = session.createConsumer(topic);
//加入監聽者
consumer.setMessageListener(new ConsumerSocket());
}
public static void main( String[] args ) throws InterruptedException, JMSException {
//啟動消費者
new App().msgTocp();
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup,workGroup);
sb.channel(NioServerSocketChannel.class);
sb.option(ChannelOption.SO_BACKLOG,128)
// 有資料立即傳送
.option(ChannelOption.TCP_NODELAY,true)
// 長連線 建議不用 用心跳來保持長連結 ,免得浪費資源 經常測試 在nginx負載均衡下是這個長連結不起作用的
.childOption(ChannelOption.SO_KEEPALIVE,true);
sb.childHandler(new ChannelInitializerHandler("12345"));
System.out.println("服務端開啟等待客戶端連線 ... ...");
sb.bind(12345).sync().channel().closeFuture().sync();
}
}
package test.mq;
import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import javax.jms.*;
/**
* 啟動服務 B
*
*/
public class App02
{
public static void main( String[] args ) throws InterruptedException, JMSException {
//啟動消費者
new App().msgTocp();
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup,workGroup);
sb.channel(NioServerSocketChannel.class);
sb.option(ChannelOption.SO_BACKLOG,128)
// 有資料立即傳送
.option(ChannelOption.TCP_NODELAY,true)
// 長連線 建議不用 用心跳來保持長連結 ,免得浪費資源 經常測試 在nginx負載均衡下是這個長連結不起作用的
.childOption(ChannelOption.SO_KEEPALIVE,true);
sb.childHandler(new ChannelInitializerHandler("12346"));
System.out.println("服務端開啟等待客戶端連線 ... ...");
sb.bind(12346).sync().channel().closeFuture().sync();
}
}
以上是全部程式碼了 。接下是效果圖
這樣就大功告成了 ,
但是這樣架構是很危險的 提單一了 redis 和mq 如果我之間有寫過mq 的叢集加高可用了 請看這裡,redis 有空會研究一下叢集高可用才行 ,以上如果有雷同的地方,都是學習中 ,架構是來自百度的 不過自己之前也想過類似 最終是別人的部落格證實了我的想法了