Netty4(三)多連線客戶端設計與實現
阿新 • • 發佈:2019-02-19
本文介紹多連線的netty客戶端設計
目標
Netty(二)一文中實現了單連線客戶端,也就是說客戶端只有一個連線,這就不利於高併發RPC的設計,本文嘗試設計一個多連線的客戶端,支援斷線重連
UML類圖
實現
多連線客戶端
package com.mym.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多連線客戶端
*/
public class MutilClient {
/**服務類*/
private Bootstrap bootstrap = new Bootstrap();
/**會話集合*/
private List<Channel> channels = new ArrayList<Channel>();
/**引用計數*/
private final AtomicInteger index = new AtomicInteger();
/**初始化*/
public void init(int count){
//worker
EventLoopGroup worker = new NioEventLoopGroup();
//設定工作執行緒
this.bootstrap.group(worker);
//初始化channel
bootstrap.channel(NioSocketChannel.class);
//設定handler管道
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new StringEncoder());
channel.pipeline().addLast(new ClientHandler());
}
});
//根據連線數建立連線
for(int i = 0;i < count;i++){
ChannelFuture channelFuture = bootstrap.connect("0.0.0.0",9099);
channels.add(channelFuture.channel());
}
}
/**獲取channel(會話)*/
public Channel nextChannel(){
return getFirstActiveChannel(0);
}
private Channel getFirstActiveChannel(int count) {
Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));
if(!channel.isActive()){
//重連
reconect(channel);
if(count > channels.size()){
throw new RuntimeException("no Idle channel!");
}
return getFirstActiveChannel(count + 1);
}
return channel;
}
/**重連*/
private void reconect(Channel channel) {
//此處可改為原子操作
synchronized(channel){
if(channels.indexOf(channel) == -1){
return ;
}
Channel newChannel = bootstrap.connect("0.0.0.0", 9099).channel();
channels.set(channels.indexOf(channel), newChannel);
System.out.println(channels.indexOf(channel) + "位置的channel成功進行重連!");
}
}
}
本類採用物件組的方式儲存連線。因為一個thread + 佇列 == 一個單執行緒執行緒池 是執行緒安全的,任務是線性序列執行的
客戶端handler
package com.mym.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client receive msg:"+msg.toString());
}
}
測試類
package com.mym.netty.client;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class StartClient {
public static void main(String[] args) {
mutilClient();
}
public static void mutilClient(){
MutilClient client = new MutilClient();
client.init(5);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while(true){
try {
System.out.println("請輸入:");
String msg = bufferedReader.readLine();
client.nextChannel().writeAndFlush(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
測試
測試步驟:連線服務端後,客戶端先向服務端傳送訊息,客戶端進行斷網,然後開網,然後再想服務端傳送訊息
客戶端輸出如下:
請輸入:
nihao
client receive msg:this is ServerHandler reply msg happend at !1531894695900this is ServerHandler2 reply msg happend at !1531894695902
此處斷網,一大堆錯。然後重新開網,再次傳送訊息
請輸入:
hello
-1位置的channel成功進行重連!
client receive msg:this is ServerHandler reply msg happend at !1531894961093
client receive msg:this is ServerHandler2 reply msg happend at !1531894961094
本次實現仍有可優化的地方,歡迎留言給出建議