1. 程式人生 > >[netty]-訊息編解碼之Java原生序列化

[netty]-訊息編解碼之Java原生序列化

訊息物件在網路上傳輸時,我們往往要對訊息進行編解碼。現在編解碼技術非常多,包括Java原生的序列化、Google的protoBuf、hessian等等。這一篇部落格我們主要介紹Java的原生序列化編解碼以及其優缺點。

基於Java提供的物件輸入/輸出流 ObjectInputStream和ObjectOutputStream可以直接將物件序列化為可儲存的位元組陣列寫入檔案或則是在網路上傳輸。Java物件的序列化目的一般只有兩個:

  1. 網路傳輸;
  2. 物件持久化。

1. Java序列化的缺點

Java序列化從JDK1.1 就已經提供,不需要新增其他的任何依賴庫就可以實現,只需要實現Serializable介面,並生成唯一序列化ID就可以序列化。但是Java原生序列化有諸多的缺點,導致在實際的生產環境中基本都是用其餘開源的編解碼框架。下面我們來看看Java原生的序列化有什麼缺點:

(1) 無法跨語言:這也是最致命的缺陷,導致無法再異構系統中使用。

(2)序列化之後碼流太大:序列化之後佔用太大的位元組。

(3)序列化效能太低:序列化比較佔用CPU的時間。

2.業界主流的編解碼框架:

由於Java序列化的表現差強人意,所以業界推出了很多高效能的編解碼框架,比如Google的Protobuf, hessian等等。這篇博文主要說明Java原生序列化的應用。

3. netty中使用Java序列化編解碼

在不考慮跨語言呼叫和效能時,Java序列化任然是首選的機制,因為原生的API使用起來十分方便。這一節主要包括:

  1. netty序列化服務端程式設計
  2. netty序列化客戶端程式設計
  3. 應用例項

1. 服務端程式設計

應用場景:客戶端傳送使用者訂購請求訊息,然後服務端接收到請求訊息物件後處理,然後傳送響應物件給客戶端。請求訊息和效應訊息的物件定義如下:
request:

package netty.quanwei.p7;

import java.io.Serializable;

/**
 * Created by louyuting on 17/2/1.
 *
 */
public class SubscribeReq implements Serializable{
    private String messageID;

    private String userName;

    private
String productName; private String phone; private String address; public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getMessageID() { return messageID; } public void setMessageID(String messageID) { this.messageID = messageID; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } } @Override public String toString() { return "SubscribeReq: [messageID]:"+ messageID + " [userName]:" +userName + " [productName]:" +productName+ " [phone]:" +phone+ " [address]:" +address; }

response:

package netty.quanwei.p7;

import java.io.Serializable;

/**
 * Created by louyuting on 17/2/1.
 */
public class SubscribeResp implements Serializable {
    private String messageID;

    private String respCode;

    private String description;

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getMessageID() {
        return messageID;
    }

    public void setMessageID(String messageID) {
        this.messageID = messageID;
    }

    public String getRespCode() {
        return respCode;
    }

    public void setRespCode(String respCode) {
        this.respCode = respCode;
    }
}
@Override
public String toString() {
    return "SubscribeReq: [messageID]:"+ messageID + " [respCode]:" +respCode
            + " [description]:" +description;
}

這裡我們使用netty提供的ObjectDecoder和ObjectEncoder來進行編解碼。

服務端的主函式編碼如下:

package netty.quanwei.p7;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created by louyuting on 17/1/31.
 */
public class SubreqServer {

    public void bind(int port) throws Exception{
        //配置服務端NIO 執行緒組
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap();

        try {
            server.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            /**
                             * 解碼器: 構造器傳入了兩個引數: #1 單個物件序列化後最大位元組長度,這是設定是1M;
                             *                           #2 類解析器: weakCachingConcurrentResolver建立執行緒安全的WeakReferenceMa對類載入器進行快取,
                             *                                      支援多執行緒併發訪問,當虛擬機器記憶體不足時,會釋放快取中的記憶體,防止記憶體洩漏.
                             */
                            ch.pipeline().addLast(new ObjectDecoder(1024*1024,
                                    ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );

                            ch.pipeline().addLast(new ObjectEncoder());

                            ch.pipeline().addLast(new SubreqServerHandler());
                        }
                    });

            //繫結埠, 同步等待成功
            ChannelFuture future = server.bind(port).sync();

            //等待服務端監聽埠關閉
            future.channel().closeFuture().sync();
        } finally {
            //優雅關閉 執行緒組
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    /**
     * main 函式
     * @param args
     */
    public static void main(String[] args) {
        SubreqServer server = new SubreqServer();
        try {
            server.bind(18888);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在handler容器中增加了一個ObjectDecoder,負責對實現了序列化介面的POJO進行解碼,它有多個建構函式,支援不同的類解析器。這裡我們使用weakCachingConcurrentResolver建立執行緒安全的WeakReferenceMa對類載入器進行快取,支援多執行緒併發訪問,當虛擬機器記憶體不足時,會釋放快取中的記憶體,防止記憶體洩漏。 此外還設定了單個物件序列化後最大位元組長度,這是設定是1M。

最後訂購訊息在SubreqServerHandler中處理:

package netty.quanwei.p7;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
 * Created by louyuting on 17/1/31.
 */
public class SubreqServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log_debug("Server -> read");

        SubscribeReq req = (SubscribeReq)msg;

        if( "louyuting".equalsIgnoreCase(req.getUserName()) ){
            System.out.println("service accept client subscript req :[\n"+ req.toString() +"]");

            ctx.writeAndFlush( resp(req.getMessageID()) );
        }
    }

    private SubscribeResp resp(String reqID){
        SubscribeResp response = new SubscribeResp();
        response.setMessageID(reqID);
        response.setRespCode("0");
        response.setDescription("subscribe is success book will arrive after 3 days");
        return response;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log_debug("Server -> read complete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //釋放資源
        ctx.close();
    }
}

客戶端程式開發

客戶端核心程式碼思路是:
1)在客戶端將netty提供的編解碼器新增到ChannelPipeline

2)在客戶端鏈路被啟用的時候傳送10條訂購訊息,為了檢驗netty提供的Java序列化功能是否支援TCP的黏包/拆包功能,客戶端一次性構造10條訂購訊息並一次性發送給伺服器,看伺服器能夠成功反序列化。

3) 客戶端接收到服務端的反饋訊息,列印。

客戶端核心程式碼:

package netty.quanwei.p7;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created by louyuting on 17/1/31.
 * netty 時間伺服器 客戶端
 */
public class SubreqClient {

    public void connect(int port, String host) throws Exception{
        //配置客戶端NIO 執行緒組
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap client = new Bootstrap();

        try {
            client.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ObjectDecoder(1024,
                                    ClassResolvers.cacheDisabled(this.getClass().getClassLoader())) );

                            ch.pipeline().addLast(new ObjectEncoder());
                            ch.pipeline().addLast(new SubreqClientHandler());
                        }
                    });

            //繫結埠, 非同步連線操作
            ChannelFuture future = client.connect(host, port).sync();

            //等待客戶端連線埠關閉
            future.channel().closeFuture().sync();
        } finally {
            //優雅關閉 執行緒組
            group.shutdownGracefully();
        }
    }

    /**
     * main 函式
     * @param args
     */
    public static void main(String[] args) {
        SubreqClient client = new SubreqClient();
        try {
            client.connect(18888, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在ObjectDecoder建構函式中配置不允許對類載入器快取。下面再看SubreqClientHandler的實現:

package netty.quanwei.p7;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
 * Created by louyuting on 17/1/31.
 */
public class SubreqClientHandler extends ChannelInboundHandlerAdapter{

    public SubreqClientHandler() {
    }

    /**
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log_debug("client -> active");

        for(int i=0; i<10; i++){
            ctx.write(subReq(String.valueOf(i)));
        }

        // 寫入10個物件到傳送緩衝之後再一次性 flush寫入通道
        ctx.flush();
    }


    private SubscribeReq subReq(String id){
        SubscribeReq req = new SubscribeReq();
        req.setMessageID(id);
        req.setUserName("louyuting");
        req.setProductName("iphone 7");
        req.setPhone("13026317652");
        req.setAddress("HUST");

        return req;
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log_debug("client -> read");


        LogUtil.log_debug("receive server response: { " + msg.toString() +"]");

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

在鏈路啟用時,即channelActive()函式中一次性構建10條訊息,然後一次傳送給伺服器。

執行結果

整個執行流程就是:
客戶端先構建訊息物件–》

在客戶端與服務端連線上時,一次性發送10個物件給服務端,這裡傳送時在客戶端會經過ObjectEncoder編碼為位元組資料–》

服務端接收到位元組陣列,先經過ObjectDecoder解碼成為實際的Java物件(這裡的ObjectDecoder需要傳入類解析器引數,類解析器引數也要傳入載入器引數),然後服務端的handler處理後染回response物件給客戶端,返回response物件在服務端也會被ObjectEncoder編碼—》

客戶端收到響應,也是先解碼,然後回去響應訊息,並處理。

以上基本上就是完整的流程,我們看看執行結果:
服務端:

2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:0 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:1 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:2 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:3 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:4 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:5 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:6 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:7 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:8 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:9 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read complete

收到了完整的10個物件:儘管客戶端一次批量傳送了10條訂購請求訊息,TCP會對訊息進行拆包和黏包,但是並不影響最後的執行結果,服務端成功接收到了10條請求訂購的訊息,與客戶端一致。

客戶端執行結果如下:

2017-02-01 12:41:17:client -> active
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:0 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:1 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:2 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:3 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:4 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:5 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:6 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:7 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:8 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:9 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]

客戶端也收到了10條反饋資訊。

相關推薦

[netty]-訊息解碼Java原生序列

訊息物件在網路上傳輸時,我們往往要對訊息進行編解碼。現在編解碼技術非常多,包括Java原生的序列化、Google的protoBuf、hessian等等。這一篇部落格我們主要介紹Java的原生序列化編解碼以及其優缺點。 基於Java提供的物件輸入/輸出流

netty解碼java原生序列

netty序列化之java原生序列化 前幾天有空看了下netty使用java原生序列化、以及使用protobuf、jboss marshalling進行編解碼,各種技術之間差異挺大,使用的方法各自不同,效能上原生的效能優勢確實不大,但是另外兩種確實很有優勢,覺得有點意思,於是準

Netty(三)——解碼

       我們在開發中經常會把Java類進行implements Serializable用來網路傳輸的序列化和反序列化,過程其實就是將Java物件轉編碼為位元組陣列或者ByteBuffer物件進行傳輸,當遠端服務讀取到ByteBuffer物件或者位元組陣列時,需要將其解

java基礎java序列

什麼是java的序列化? 物件序列化機制(object serialization)是Java語言內建的一種物件持久化方式,通過物件序列化,可以把物件的狀態儲存為位元組陣列,並且可以在有需要的時候將這個位元組陣列通過反序列化的方式再轉換成物件 用自己的話講來

java原生序列慢在哪裡?

Java原生序列化和二進位制序列化效能比較 序列化速度 package com.clq.netty.serializable; import java.io.ByteArrayOutputStream; import java.io.IOExcep

物件序列——java原生序列、Kryo序列效能比較和Hessian序列

什麼是序列化以特定的方式對類例項的瞬時狀態進行編碼儲存的一種操作,叫做物件序列化。就是將物件的這個時刻的各種屬性各種值按照一定的規則變成二進位制流,然後如果傳輸到別的jvm中,jvm可以按照規則在將二進位制流反序列化成對應的物件,並且物件裡面還有當時的資料和各種屬性。序列化的

java架構路-(netty專題)netty解碼(出入戰)與粘包拆包

上次迴歸:   上次部落格我們主要說了netty的基本使用,都是一些固定的模式去寫的,我們只需要關注我們的攔截器怎麼去寫就可以了,然後我們用我們的基礎示例,改造了一個簡單的聊天室程式,可以看到內部加了一個StringEncoder和StringDecoder,這個就是用來編解碼我們字串的,這次我們就來說說這個

netty解碼jboss marshalling

netty編解碼之jboss marshalling jboss marshalling是jboss內部的一個序列化框架,速度也十分快,這裡netty也提供了支援,使用十分方便,不需要像protobuf一樣編寫proto檔案,只需要提供兩個編解碼器即可,以下就是jboss ma

netty 解碼 Messagepack

首先說一下需要注意的問題1:傳送端的javabean 一定要有註解 @Message。這個註解要加在類上。使用@Message 可以標記被序列化的類。類中所有成員都會被序列化。 否則javabean不會被序列化,也就接收不到了。接收端的javabean可以不加這個註解。習慣性

十、解碼技術--Java序列

Java自身的序列化機制,就是隻需要序列化的POJO物件實現Java.io.Serializable介面,根據實際情況生成序列ID,這個類就能夠通過java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。 服務端:N

Java基礎對象序列

處理 exc 單例 color 直接 tro 生命 found public 1. 什麽是Java對象序列化 Java平臺允許我們在內存中創建可復用的Java對象,但一般情況下,只有當JVM處於運行時,這些對象才可能存在,即,這些對象的生命周期不會比JVM的生命周期更

轉多線程設計模式 - Future模式JAVA原生實現

pre 復雜 業務邏輯 inter real 保存 如何 http adp  在之前一篇博客中介紹了Future設計模式的設計思想以及具體實現,今天我們來講一下使用JDK原生的包如何實現。   JDK內置的Future主要使用到了Callable接口和FutureTask類

並發 Java 三把鎖

設計原理 由於 hand timeunit 訪問 什麽是 inter 提高 指令 前言 今天我們繼續學習並發。在之前我們學習了 JMM 的知識,知道了在並發編程中,為了保證線程的安全性,需要保證線程的原子性,可見性,有序性。其中,synchronized 高頻出現,因為他既

即時通訊音視訊開發(四):視訊解碼預測技術介紹

前言 即時通訊應用中的實時音視訊技術,幾乎是IM開發中的最後一道高牆。原因在於:實時音視訊技術 = 音視訊處理技術 + 網路傳輸技術 的橫向技術應用集合體,而公共網際網路不是為了實時通訊設計的。 系列文章 《即時通訊音視訊開發(二):視訊編解碼之數字視訊介紹》 《即時通訊音

即時通訊音視訊開發(三):視訊解碼編碼基礎

前言 即時通訊應用中的實時音視訊技術,幾乎是IM開發中的最後一道高牆。原因在於:實時音視訊技術 = 音視訊處理技術 + 網路傳輸技術 的橫向技術應用集合體,而公共網際網路不是為了實時通訊設計的。 系列文章 《即時通訊音視訊開發(一):視訊編解碼之理論概述》 《即時通訊音視訊

即時通訊音視訊開發(二):視訊解碼數字視訊介紹

前言 即時通訊應用中的實時音視訊技術,幾乎是IM開發中的最後一道高牆。原因在於:實時音視訊技術 = 音視訊處理技術 + 網路傳輸技術 的橫向技術應用集合體,而公共網際網路不是為了實時通訊設計的。 系列文章 本文是系列文章中的第2篇,本系列文章的大綱如下:   《即時

即時通訊音視訊開發(一):視訊解碼理論概述

前言 即時通訊應用中的實時音視訊技術,幾乎是IM開發中的最後一道高牆。原因在於:實時音視訊技術 = 音視訊處理技術 + 網路傳輸技術 的橫向技術應用集合體,而公共網際網路不是為了實時通訊設計的。 系列文章 《即時通訊音視訊開發(二):視訊編解碼之數字視訊介紹》 《即時通訊音

iOS學習-即時通訊音視訊(一)視訊解碼理論

參考大神:http://www.52im.net/thread-228-1-1.html 實時音視訊技術 = 音視訊處理技術 + 網路傳輸技術 的橫向技術應用集合體。 視訊為何需要壓縮? 1.未經壓縮的數字視訊的資料量巨大 2. 儲存困難,一張DVD只能儲存幾秒鐘的未壓縮數字視訊 3. 傳輸困難 1

Java序列Jackson-databind

這個洞的cve編號:CVE-2017-17485,漏洞環境就如第一個連結那樣,jdk需要在jdk 1.8以上。 先看一下Jackson-databind的用法,說白了就是將json轉換成物件。 test-legit.json程式碼如下 {"id":123} 執行結果如圖: 如果注入的json程式碼如下程

Java序列排除被序列欄位(transient/靜態變數)

 我們都知道一個物件只要實現了Serilizable介面,這個物件就可以被序列化,java的這種序列化模式為開發者提供了很多便利,我們可以不必關係具體序列化的過程,只要這個類實現了Serilizable介面,這個類的所有屬性和方法都會自動序列化。       然而在實際