1. 程式人生 > >java網路程式設計之Netty流資料的傳輸處理(五)

java網路程式設計之Netty流資料的傳輸處理(五)

Netty流資料的傳輸處理

Socket Buffer的缺陷

      對於例如TCP/IP這種基於流的傳輸協議實現,接收到的資料會被儲存在socket的接受緩衝區內。不幸的是,這種基於流的傳輸緩衝區並不是一個包佇列,而是一個位元組佇列。這意味著,即使你以兩個資料包的形式傳送了兩條訊息,作業系統卻不會把它們看成是兩條訊息,而僅僅是一個批次的位元組序列。因此,在這種情況下我們就無法保證收到的資料恰好就是遠端節點所傳送的資料。例如,讓我們假設一個作業系統的TCP/IP堆疊收到了三個資料包:

ABC DEF GHI

      由於這種流傳輸協議的普遍性質,在你的應用中有較高的可能會把這些資料讀取為另外一種形式:

ABCDE FG H I

      因此對於資料的接收方,不管是服務端還是客戶端,應當重構這些接收到的資料,讓其變成一種可讓你的應用邏輯易於理解的更有意義的資料結構。在上面所述的這個例子中,接收到的資料應當重構為下面的形式:

ABC DEF GHI

第一種解決方案(使用特殊字元分割)

Netty提供了一個分隔符類DelimiterBasedFrameDecoder(自定義分隔符)

下面的開發我是居於我的Netty第一個開發程式來講的,沒看過我的這篇文章可以先看看,想信你在Netty第一個開發程式會捕獲很多你想不到的知識。

服務端
public class Server {

    public static void main(String[] args) throws Exception{
        //1 建立2個執行緒,一個是負責接收客戶端的連線。一個是負責進行資料傳輸的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        //2 建立伺服器輔助類
        ServerBootstrap b = new
ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //1 設定特殊分隔符 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //2 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //3 設定字串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 繫結連線 ChannelFuture cf = b.bind(8765).sync(); //等待伺服器監聽埠關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }

關於EventLoopGroup、ServerBootstrap等等之類的我都在Netty的第一個程式都講得很清楚了,需要了解的可以參考我的第一篇文章。

程式碼說明:

1、 Unpooled.copiedBuffer(“$_”.getBytes()) 這個是設定特殊分隔符返回的是Netty中的ByteBuf型別這裡我設定的是 $_

2、DelimiterBasedFrameDecoder()是處理分隔符的類

3、StringDecoder() 設定字串形式的解碼

注意這裡使用了StringDecoder()解碼成字串形式,並不像在“Netty的第一個程式”那種方式去轉換成字串。

服務端業務處理
public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server :" + msg);
        String response = "伺服器響應:" + msg + "$_";
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

}

這裡沒什麼可說的!看過我的Netty的第一個程式這篇文章大家都懂。

由於在服務端就使用了StringDecoder()解碼成字串形式,這裡不需要用ByteBuf去轉換成字串。

客戶端

public class Client {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //1
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                //2
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                //3
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });

        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("777$_".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("666$_".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("888$_".getBytes()));


        //等待客戶端埠關閉
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();

    }
}

由於這裡客戶端也接收服務端返回的資料所以也採用了與服務端一樣的處理方式。

如果你看過我的Netty的第一個程式文章,你會發現當時我是休眠1s再進行傳送另一條的。到這目前你應該也知道我什麼這樣做了吧!

客戶端業務處理

public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String)msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }


}

這裡沒什麼可說的!看過我的Netty的第一個程式這篇文章大家都懂。

好!到這第一種解決方案就編寫結束了,先啟動服務端,再啟動客戶端

客戶端列印如下:

這裡寫圖片描述

客戶端簽到後服務端的列印如下:

這裡寫圖片描述

第二種解決方案(定長)

Netty提供了一個定長類FixdeLengthFraneDecoder

使用這個定長的有個弊端:如果由多個欄位比如可變長度的欄位組成時這個時候並解決不了什麼問題,建議使用第一個解決方案。

FixdeLengthFraneDecoder的使用跟DelimiterBasedFrameDecoder差不多,由於程式碼都差不多一樣這裡我不做太多的說明。

服務端

public class Server {

    public static void main(String[] args) throws Exception{
        //建立2個執行緒,一個是負責接收客戶端的連線。一個是負責進行資料傳輸的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        //建立伺服器輔助類
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //1  設定定長字串接收  
                sc.pipeline().addLast(new FixedLengthFrameDecoder(3));
                //2  設定字串形式的解碼
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });

        //4 繫結連線
        ChannelFuture cf = b.bind(8765).sync();

        //等待伺服器監聽埠關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();

    }

}

1、FixedLengthFrameDecoder(3) 這裡設定定長字串接收具體設定多長自己定

2、StringDecoder() 設定字串形式的解碼

服務端業務處理

public class ServerHandler extends ChannelHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server :" + msg);
        String response =  request ;
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }
}
服務端
public class Client {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new FixedLengthFrameDecoder(3));
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });

        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("777".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("666".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("888".getBytes()));

        //等待客戶端埠關閉
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();

    }
}
客戶端業務處理
public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String)msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }


}

好!到這第二種解決方案就編寫結束了,先啟動服務端,再啟動客戶端

客戶端列印如下:

這裡寫圖片描述

客戶端簽到後服務端的列印如下:

這裡寫圖片描述

相關推薦

java網路程式設計Netty資料傳輸處理

Netty流資料的傳輸處理 Socket Buffer的缺陷       對於例如TCP/IP這種基於流的傳輸協議實現,接收到的資料會被儲存在socket的接受緩衝區內。不幸的是,這種基於流的傳輸緩衝區並不是一個包佇列,而是一個位元組佇列。這意味著,即使

java網路程式設計Netty實戰資料通訊

Netty最佳實戰資料通訊 1 分析       我們需要了解下在真正專案應用中如何去考虛Netty的使用,大體上對於一引數設定都是根據伺服器效能決定的。這個不是最主要的。       我們要考慮的問題是兩臺機器(甚至多臺)使用Netty的怎樣進行通訊,我

Java網路程式設計Netty拆包和黏包-yellowcong

Netty中,解決拆包和黏包中,解決方式有三種 1、在每個包尾部,定義分隔符,通過回車符號,或者其他符號來解決 2、通過定義每個包的大小,如果包不夠就空格填充 3、自定義協議的方式,將訊息分為訊息頭和訊息體,在訊息頭中表示出訊息的總長度,

C#網路程式設計---TCP協議的同步通訊

上一篇學習日記C#網路程式設計之--TCP協議(一)中以服務端接受客戶端的請求連線結尾 既然服務端已經與客戶端建立了連線,那麼溝通通道已經打通,載滿資料的小火車就可以彼此傳送和接收了。現在讓我們來看看資料的傳送與接收 先把服務端與客戶端的連線程式碼敲出來 服務端 IPAddress ip = new IP

網路程式設計即時通訊程式(聊天室)------通訊流程簡介及通訊協議定製

      在開始講之前,我想先跟大家描述一下,這個所謂的通訊程式具體是一個什麼樣的東西。該通訊程式類似一個弱版本的qq,登入時需要進行註冊,登入成功後,可以實現即時的通訊,群聊,私聊,同時還可傳檔案。先上個圖 服務端:                           

JAVA 併發程式設計-執行緒範圍內共享變數

執行緒範圍內共享變數要實現的效果為:多個物件間共享同一執行緒內的變數未實現執行緒共享變數的demo:package cn.itcast.heima2; import java.util.HashMap; import java.util.Map; import java.u

Java網路程式設計的詳解

前言 大部分網路程式做的事情就是接受輸入併產生輸出。讀伺服器傳送過來的資料與讀取本地檔案的資料並沒有多大的區別,同時伺服器將資料傳送給客戶端與寫資料到本地檔案也很像。 Java的IO操作基於streams實現的。輸入流讀資料,輸出流寫資料。 該系列文

Java網路程式設計Socket

原文博主禁止轉載,不過我還是希望把一些關鍵的地方筆記下來,閱讀請移步 原文 以下是學習之後的個人筆記 一、Socket通訊基本例項   通過伺服器-客戶端模式引入Socket通訊 伺服器端 package cn.itcast.net; import java.io

JAVA 網路程式設計(6) Netty TCP 示例

maven使用的netty版本如下: <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> &

Java網路程式設計URL和URI

URL和URI URL可以唯一地標識一個資源在Internet上的位置。URL是最常見的URI URI URI的結構: 模式:模式特定部分 常見的模式有: data file ftp http mailto magnet teln

JAVA 網路程式設計(7) Netty 處理Http協議 示例

maven中使用netty的版本為: <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> &

Java網路程式設計Socket通訊

       最近在學習Java網路程式設計,之前聽說過,但是一直都沒有認真瞭解過。這幾天突然來了興致,覺得很神奇,忽然就想要了解下具體是什麼個情況。         Socket通常也稱作"套接字",用於描述IP地址和埠,是一個通訊鏈的控制代碼。在Internet上的主機

Java網路程式設計URLEncode和URLDecode工具類

import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; public class EncodeTest {

JAVA網路程式設計模擬表單提交

這一篇部落格是對上一篇《JAVA網路程式設計之獲取網路資源》的擴充,這一篇將使用HttpURLConnection來模擬一個表單的提交。在B/S架構的系統中,請求時通過瀏覽器與服務端進行互動的,提交請求引數時使用form表單進行提交,但是有很多時候,我們需要在程

Java 網路程式設計 TCP協議

TCP協議 (伺服器端程先啟動,等待客戶端連線)   TCP協議是面向連線的通訊協議,即在傳輸資料前先在傳送端和接收端建立邏輯連線,然後再傳輸資料 保證傳輸資料的全性安,檔案資料不易丟失   在JDK中提供了兩個類用於實現TCP程式,一個是ServerSocket類,用於表示伺

java網路程式設計學習筆記 式套接字程式設計

tcp是Transmission Control Protocol即傳輸控制協議,是一種面性連線的協議。 在java中使用tcp程式設計需要用到兩個類 1.ServerSocket(代表伺服器) 2.Socket(代表客戶端) 伺服器端程式碼: //伺服器端在埠8888監聽 Serve

Java網路程式設計HttpURLConnection

從上圖,可知HttpURLConnection是URLConnection類的子類,先了解下URLConnection. URLConnection類: 描述: 是從Url建立的,並且用

java網路程式設計TFTP

** TFTP簡單檔案傳輸協議 ** TFTP使用了UDP套接字,效率比較高,但是也要求TFTP為資料傳輸的不可靠負責。 TFTP伺服器在69埠上監聽到來的資料包,客戶端使用一個隨機的埠號 TFTP作用:許多無盤工作站使用TFTP來載入它們需要的來

Java網路程式設計多執行緒Client-Server

前面廢話過了,現在就直接看程式碼吧! ThreadedClient.java package exercise01; import java.io.*; import java.net.*; public class ThreadedClient { privat

java網路程式設計下載檔案通過多執行緒分塊下載(二)

importjava.io.File; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConn