1. 程式人生 > >基於netty框架的Socket傳輸

基於netty框架的Socket傳輸

報文 行業 initial factory 標準 進一步 壓測 hello bin

一、Netty框架介紹

什麽是netty?先看下百度百科的解釋:

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。 也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。 “快速”和“簡單”並不用產生維護性或性能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各種二進制,文本協議,並經過相當精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性

為什麽好多大公司都在使用netty框架?主要是基於netty框架的以下幾個特點決定的:

1)健壯性,2)功能齊全,3)可定制,4)擴展性

二、框架優點

傳統的RPC性能差,主要是由於客戶端和遠程調用采用了同步阻塞IO線程,當客戶端的並發壓力增大後,同步阻塞會由於頻繁的等待導致I/O線程堵塞,線程無法高效的工作,IO處理能力自然會降低。影響性能的三個因素:第一,IO模型,IO模型在一定程度上決定了框架的性能。第二、協議,如:HTTP、TCP/IP等,協議選擇的不同,性能模型也不同,通常情況下,內部私有協議的性能比較優,這是由於內部設計決定的。第三、線程,數據報文的接收、讀取、編碼、解碼等,線程模型的不同,性能也不同。相比於傳統的RPC框架,netty的優點主要體現在以下幾個方面:

  1. API使用簡單,封裝非常完善,開發門檻低
  2. 功能上強大,預置了多種編碼解碼功能,多種協議支持
  3. 定制能力強,可以對ChannelHandler對通信框架靈活擴展
  4. 性能高,Reactor線程模型調度,ChannelFuture-Listener,通過Listener機制主動推送結果
  5. 版本成熟穩定,社區活躍,版本更新快,出現的Bug會被很快的修復,同時,有心功能的加入,經歷了大規模的商業應用考驗,質量的到了充分的驗證。已經廣泛應用到互聯網、大數據、企業應用、電信軟件、網絡遊戲等熱門行業,他可以滿足不同的商業標準。

三、Netty架構分析

Netty是一個基於三層網絡架構模型的框架,三層網絡架構分析包括調度層、鏈條傳遞層以及業務邏輯層。

  1. Reactor通信調度層,是一個模型,

NIO線程池組件{

監聽網絡讀寫連接

業務調度處理

NIO,AIO,配合NIO通道NioSocketChannel組件

}

Netty通過內部select巡查機制,能夠實現IO多路復用,通過把多個IO阻塞復用到同一個select的阻塞上,從而能夠使系統即使在單線程的情況下,也能夠同時處理多個請求。這樣就使得netty實現了IO多路復用的優勢,與傳統多線程相比,大大減少了系統的開銷,因為系統不必創建新的線程和銷毀線程了,減少了系統的維護難度,節省了資源。

ByteBuffer池化支持,不用手動切換標誌位,實現零拷貝。傳統的Socket讀寫,基本是使用堆內存進行,即jvm事先會把堆內存拷貝到內存中,然後再寫入Socket,而netty采用的是DIRECT BUFFERS,不需要經過jvm內存拷貝,在堆外內存直接進行Socket讀寫,這樣就少了一次緩沖區的內存拷貝,從而實現零拷貝。

2.Pipleline職責鏈條傳遞

攔截處理向前向後事件,外部傳入的消息包對象,有POJO信息抽象,上層也只需要處理邏輯,類似SpringIOC處理BeanDefince。不同的Handler節點的功能也不同,通常情況下需要編碼解碼等,它可以完成外部協議到內部POJO對象的轉化,這樣上層只需要關註業務邏輯,不需要知道底層的協議和線程模型,從而實現解耦。

3.構建邏輯業務處理單元

底層的協議隔離,上層處理邏輯框架並不需要關心底層協議是什麽。Netty框架的分層設計使得開發人員不需要關註協議框架的實現,只需要關註服務層的業務邏輯開發即可,實現了簡單化。

之前有個項目是基於傳統Socket和線程池的技術實現的,但是在高並發的時候發現並發能力不足,壓測的時候發現TPS達不到理想值,所以經過考慮,決定使用netty框架來解決此問題。同樣,netty框架也分為客戶端和服務端,經過整理,先寫一個demo初探netty框架,下面是代碼的實現過程。

首先是服務端,服務端包含兩個方面,第一、服務端Server的主要作用就是通過輔助引導程序,設置NIO的連接方式處理客戶端請求,通過綁定特定端口、設定解碼方式以及監聽來實現整個線程的處理請求;第二、服務端Handler需要繼承ChannelInboundHandlerAdapter類,handler類的主要作用是讀取客戶端數據,處理業務,拋出異常,響應客戶端請求。代碼如下:

服務端Server:

技術分享圖片
public class Server {

private static Log logger = LogFactory.getLog(Server.class);

private int port;

public Server(int port) {

        super();

        this.port = port;

}

public  void start(){

        ServerBootstrap b = new ServerBootstrap();//引導輔助程序

        EventLoopGroup group = new NioEventLoopGroup();//通過nio方式來接收連接和處理請求

        try {

               b.group(group);

               b.channel(NioServerSocketChannel.class);//設置nio類型的channnel

               b.localAddress(new InetSocketAddress(port));//設置監聽端口

               //b.option(ChannelOption.SO_BACKLOG, 2048);

               b.childHandler(new ChannelInitializer<SocketChannel>() {//有連接到達時會創建一個channel

                      @Override

                      protected void initChannel(SocketChannel ch) throws Exception {

                             //註冊handler

                             ch.pipeline().addLast(new ByteArrayDecoder());

                             ch.pipeline().addLast(new ByteArrayEncoder());

                             ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));

                             ch.pipeline().addLast(new ServerHandler());

                            

                      }

               });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true);

               ChannelFuture f = b.bind().sync();//配置完成,開始綁定server,通過調用sync同步方法阻塞直到綁定成功

               logger.info(Server.class.getName()+"開始監聽:"+f.channel().localAddress());

               f.channel().closeFuture().sync();//應用程序會一直等待直到channel關閉

        } catch (Exception e) {

               e.printStackTrace();

        } finally {

               try {

                      //關閉EventLoopGroup,釋放掉所有資源包括創建的線程

                      group.shutdownGracefully().sync();

               } catch (InterruptedException e) {

                      e.printStackTrace();

               }

        }

}

}
View Code

服務端Handler

技術分享圖片
public class ServerHandler extends ChannelInboundHandlerAdapter {

private static Log logger=LogFactory.getLog(ServerHandler.class);

@Override

public void channelActive(ChannelHandlerContext ctx){

        logger.info(ctx.channel().localAddress().toString()+"通道活躍....");

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        logger.error(ctx.channel().localAddress().toString()+"通道不活躍....");

}

/**

 *

 * 讀取客戶端傳過來的消息

 */

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //業務處理類

        logger.info("開始業務處理....");

        new SocketController(ctx,msg).run();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        //出現異常,關閉連

        logger.error("服務端出現異常:"+cause.getMessage(),cause);

        ctx.close();

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        logger.info("服務端完成請求!");

        ctx.flush();

}

}
View Code

客戶端代碼

客戶端主要是用來向服務端發送數據,同樣包含兩個方面,第一、Client主要通過設定端口和IP和服務器建立連接,進行數據包的編碼;第二、ClientHandler 需要繼承 SimpleChannelInboundHandler<ByteBuf>類,針對不同的傳輸方式,繼承不同的類,handler類同樣處理業務請求,響應服務端的請求。代碼如下:

客戶端Client:

技術分享圖片
public class Client {
    private static Log logger=LogFactory.getLog(Client.class);
    private String host;
    private int port;
    public Client(String host, int port) {
        super();
        this.host = host;
        this.port = port;
    }
    public void connect(){
        EventLoopGroup workGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(workGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                logger.info("客戶端觸發連接......");
                ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(new ClientHandler());
            }
        });
        //客戶端開始連接
        try {
            logger.info("連接到服務器......");
            ChannelFuture future=bootstrap.connect(host,port).sync();
            //等待連接關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            workGroup.shutdownGracefully();
        }
    }
}
View Code

客戶端Handler:

技術分享圖片
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static Log logger=LogFactory.getLog(ClientHandler.class);
    /**
     * 向服務端發送消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().localAddress().toString()+"客戶點活躍...");
        //向服務端寫字符串
        logger.info("客戶端連接服務端,開始發送數據.....");
        String string ="hello server!";
        System.out.println("發送數據為:"+string);
        ByteBuf buf=ctx.alloc().buffer(4*string.length());
        buf.writeBytes(string.getBytes());
        ctx.writeAndFlush(buf);
        logger.info("發送完畢...");
    }
    
    /**
     * 讀取服務端返回來的消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception {
        logger.info("開始接受服務端數據");
        byte[] b=new byte[in.readableBytes()];
        in.readBytes(b);
        String string=new String(b);
        logger.info("服務端發送的數據為:"+string);
        in.release();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.info("客戶端異常:"+cause.getMessage(),cause);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("客戶端完成請求....");
        ctx.flush();    
    }
}
View Code

服務端啟動:

技術分享圖片
public class ServerMain {

private static Log logger=LogFactory.getLog(ServerMain.class);

private static Server server =new Server(55550);

public static void main(String[] args) {

        logger.info("服務端啟動.......");

        server.start();

}

}
View Code

客戶端啟動類:

技術分享圖片
public class Test {

private static Client client = new Client("127.0.0.1", 55550);

public static void main(String[] args) throws UnknownHostException, IOException {

        client.connect();

}

}
View Code

測試結果:

服務端:

技術分享圖片

客戶端:

技術分享圖片

總結:

以上只是一個netty框架初探的小Demo,學習使用netty框架的開始,這裏面涉及到了很多的技術以及非常多的組件,比如:Channels、Callbacks、Futures、Events和handlers等等,需要進一步的學習,另外,消息的編碼解碼、粘包、拆包的方式方法、消息格式的轉換以及報文格式大小限制都需要進一步的研究學習。

基於netty框架的Socket傳輸