1. 程式人生 > >多連線的客戶端示例

多連線的客戶端示例

Netty 4/5 說明: 目前  http://netty.io/ 釋出的最新版本號4.1.30.Final,但是並沒有netty5相關的版本釋出了

Netty3 模擬多連線的客戶端

package xss.netty.netty3.client;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import xss.netty.netty3.HelloClientHandler;

import java.net.InetSocketAddress;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 *  多個連線的客戶端
 */
public class ClientMutliChannel {
    //客戶端連線服務類
    ClientBootstrap clientBootstrap=new ClientBootstrap();

    //會話列表
//    List<Channel> channelList=new ArrayList<Channel>();
    Vector<Channel> channels=new Vector<Channel>();


    //計數器
    private final AtomicInteger index =new AtomicInteger();

    /**
     * 初始化channel 的個數
     * @param channelCount
     */
    public void init(int channelCount){
        ExecutorService worker = Executors.newCachedThreadPool();
        ExecutorService boss = Executors.newCachedThreadPool();
        //設定Thread Factory
        clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss,worker));
        //set pipe line factory
        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline channelPipeline= Channels.pipeline();
                channelPipeline.addLast("decoder",new StringDecoder());
                channelPipeline.addLast("encoder",new StringEncoder());
                channelPipeline.addLast("hello",new HelloClientHandler());
                return channelPipeline;
            }
        });
        //connet to server
        for(int count=1;count<channelCount;count++) {
            ChannelFuture channelFuture=clientBootstrap.connect(new InetSocketAddress("127.0.0.1",9999));
            channels.add(channelFuture.getChannel());
        }
        System.out.println("Init 完成");
    }

    public Channel getChannel(){
        return getActiveChannel(0);
    }

    /**
     * 返回可用的channel
     * @param count
     * @return
     */
    private Channel getActiveChannel(int count){
        Channel channel=channels.get(Math.abs(index.getAndIncrement() % channels.size()));
        if(!channel.isConnected()){
            reconnect(channel);
            if(count >= channels.size()){// 超出連線連線數的大小
                throw new RuntimeException("沒有可用的連線");
            }
            return getActiveChannel(count+1);//嘗試下一個
        }
        return channel;
    }

    /**
     * 重連
     * @param channel
     */
    private void reconnect(Channel channel){
        if(channels.indexOf(channel) == -1){
            return;
        }
        Channel reconnetChannel=clientBootstrap.connect(new InetSocketAddress("127.0.0.1",9999)).getChannel();
        channels.set(channels.indexOf(channel),reconnetChannel);
    }

}

相關問題: 是不是執行緒安全的? 粘包/拆包的問題?

後續驗證粘包與拆包的問題