1. 程式人生 > >Netty自帶連線池的使用

Netty自帶連線池的使用

一、類介紹
1.ChannelPool——連線池介面

2.SimpleChannelPool——實現ChannelPool介面,簡單的連線池實現

3.FixedChannelPool——繼承SimpleChannelPool,有大小限制的連線池實現

4.ChannelPoolMap——管理host與連線池對映的介面

5.AbstractChannelPoolMap——抽象類,實現ChannelPoolMap介面

二、具體使用
a、MyNettyPool——Netty自帶連線池的用法

package com.dxfx.netty.demo;

import com.alibaba.fastjson.JSONObject;
import com.dxfx.netty.framework.Constants; import com.dxfx.netty.framework.DefaultFuture; import com.dxfx.netty.framework.NettyClientHandler; import com.dxfx.netty.param.RequestParam; import com.dxfx.netty.param.Response; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.AbstractChannelPoolMap; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolMap; import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; /** * Netty自帶連線池的用法 * * @author Administrator * */ public class MyNettyPool { // key為目標host,value為目標host的連線池 public static ChannelPoolMap<String, FixedChannelPool> poolMap; private static final Bootstrap bootstrap = new Bootstrap(); static { bootstrap.group(new NioEventLoopGroup()); bootstrap.channel(NioSocketChannel.class); bootstrap.remoteAddress("localhost", 8080); } public MyNettyPool() { init(); } /** * netty連線池使用 * */ public void init() { poolMap = new AbstractChannelPoolMap<String, FixedChannelPool>() { @Override protected FixedChannelPool newPool(String key) { ChannelPoolHandler handler = new ChannelPoolHandler() { /** * 使用完channel需要釋放才能放入連線池 * */ @Override public void channelReleased(Channel ch) throws Exception { // 重新整理管道里的資料 // ch.writeAndFlush(Unpooled.EMPTY_BUFFER); // flush掉所有寫回的資料 System.out.println("channelReleased......"); } /** * 當連結建立的時候新增channelhandler,只有當channel不足時會建立,但不會超過限制的最大channel數 * */ @Override public void channelCreated(Channel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0])); ch.pipeline().addLast(new StringDecoder()); // 繫結channel的handler ch.pipeline().addLast(new NettyClientHandler()); } /** * 獲取連線池中的channel * */ @Override public void channelAcquired(Channel ch) throws Exception { System.out.println("channelAcquired......"); } }; return new FixedChannelPool(bootstrap, handler, 50); //單個host連線池大小 } }; } /** * 傳送請求 * * @param msg * 請求引數 * @param command * 請求方法 * @return */ public Response send(final Object msg, final String command) { //封裝請求資料 final RequestParam request = new RequestParam(); request.setCommand(command); request.setContent(msg); //從連線池中獲取連線 final FixedChannelPool pool = poolMap.get("localhost"); //申請連線,沒有申請到或者網路斷開,返回null Future<Channel> future = pool.acquire(); future.addListener(new FutureListener<Channel>() { @Override public void operationComplete(Future<Channel> future) throws Exception { //給服務端傳送資料 Channel channel = future.getNow(); channel.write(JSONObject.toJSONString(request)); channel.writeAndFlush(Constants.DELIMITER); System.out.println(channel.id()); // 連線放回連線池,這裡一定記得放回去 pool.release(channel); } }); //獲取服務端返回的資料 DefaultFuture defaultFuture = new DefaultFuture(request); return defaultFuture.get(); } }

 

b、MyNettyPoolTest——Netty自帶連線池測試類,SpringServer為連線池啟動類

package com.dxfx.netty.demo;

import com.dxfx.netty.param.Response;
import com.dxfx.user.model.User;

/**
 * Netty自帶連線池測試類,SpringServer為連線池啟動類
 * 
 * @author Administrator
 *
 */
public class MyNettyPoolTest {

    public static void main(String[] args) {
        User user = new User();
        user.setAge(12);
        user.setId(23);
        user.setName("client");
        String command = "login";
        
        MyNettyPool pool = new MyNettyPool();
        new MyThread(pool, user, command).start();
        new MyThread(pool, user, command).start();
        new MyThread(pool, user, command).start();
        new MyThread(pool, user, command).start();
        for (int i = 0; i < 50000; i++) {
            new MyThread(pool, user, command).start();
        }
    }

}

class MyThread extends Thread {

    public MyNettyPool pool;
    public Object msg;
    public String command;

    public MyThread(MyNettyPool pool, Object msg, String command) {
        super();
        this.pool = pool;
        this.msg = msg;
        this.command = command;
    }

    @Override
    public void run() {
        Response response = pool.send(msg, command);
        //System.out.println(response);
    }

}