網際網路技術22——netty編解碼技術與資料通訊
Netty服務部署
常用的部署方式有2中,一種是耦合在Web應用中(以Tomcat為例),使其伴隨Tomcat的啟動而啟動,伴隨Tomcat的關閉而關閉。另外一種則是將Netty獨立打包部署,然後由單獨的程序啟動執行(可以使用shell或其他指令碼進行啟動),然後以資料庫或者其他快取為承接點,實現資料互動。Netty與其他程式進行互動,然後將獲取到的資料進行處理插入資料庫或者快取,然後其他服務從中獲取。獲取在Netty中呼叫web應用的一些對外介面。
Netty編解碼技術
編解碼技術,說白了就是java序列化技術,序列化目的就兩個,第一個進行網路傳輸,第二物件持久化。雖然我們可以使用java進行物件序列化,netty去傳輸,但是java序列化的硬傷太多,比如:java序列化沒法跨語言、序列化後碼流太大、序列化效能太低等等。。
主流的編解碼框架:
- JBoss的Marshalling包
- google的Protobuf
- 基於Protobuf的Kyro (效能高於Protobuf,可以與Marshalling媲美)
- MessagePack框架
Netty結合JBoss Marshalling
JBoss Marshalling是一個java物件序列化包,對JDK預設的序列化框架進行了優化,但又保持跟java.io.Seriallzable介面相容,同時增加了一些可調的引數和附加特性。
類庫:jboss-marshalling1.3.0、jboss-marshalling-serial-1.3.0
這兩個包一定要都匯入,尤其是jboss-marshalling-serial-1.3.0,缺此包可能不會報錯,但是server不能解析傳輸的資料物件,跳過channelRead()而直接執行channelComple()
下載地址:https://www.jboss.org/jbossmarshalling/downloads/
maven路徑:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>1.3.0.GA</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.3.0.GA</version> </dependency>
下面的程式碼我直接結合著長連線超時重連一起演示,即資料通訊
資料通訊
一般在專案中我們更改如何使用netty呢?大體上對於一些茶樹配置都是根據伺服器效能決定的,這個不是最主要的,我們需要考慮的問題是兩天機器(甚至是多臺)使用netty怎樣進行通訊,一般分為三種:
1. 第一種,使用長連線通道不斷開的形式進行通訊,也就是伺服器和客戶端一直處於開啟狀態,如果伺服器效能足夠好,並且我們客戶端的數量也比較少的情況下,還是可以採用這種方式的。
2.第二種,一次性批量提交資料,採用短連線方式。也就是我們會把資料儲存咋及本地臨時緩衝區或者臨時表裡,當達到臨界值時進行一次性批量提交,又或者根據定時任務輪詢提交,這種情況弊端是做不到實時性傳輸,在對實施性不高的應用程式彙總可以推薦使用。
3.第三種,使用一種特殊的長連線,在指定某一時間內,伺服器與某客戶端沒有任何通訊則斷開連線,下次連線則是客戶端發起請求的時候再次建立連線,但是這種模式我們需要考慮兩個因素:
(1)如何在超時(即伺服器和客戶端沒有任何通訊)後關閉通道,關閉通道後我們又如何再次建立連線?答:可以使用連線的時候新增判斷,如果連線狀態可用直接使用,如果斷開則重新建立連線
(2)客戶端宕機時,我們無需考慮,下次客戶端重啟之後,我們就可以與伺服器建立連線,但是伺服器宕機時,我們的客戶端如何與伺服器進行連線呢?答:可以在客戶端使用定時任務去輪詢連線,知道連線成功建立為止。
結合上述編解碼技術和資料通訊的第三種方式,做一個統一的演示
netty已經為我們提供了超時機制,只需要在bootstrap中初始化ChannelInitializer時增加超時處理器ReadTimeOutHandler就可以了
ch.pipeline().addLast(new ReadTimeoutHandler(5));單位預設我秒,理論上在一段增加就可以了,為了雙重保險,建議客戶端和服務端同時配置。另外注意在資料傳送完畢後不能新增監聽去關閉(writeFlush.addListenser(ChannelFutureListener.CLOSE),因為該監聽會立即斷開連線。
序列化工具Marshalling部分的封裝:MarshallingCodeCFactory.java
package com.nettyCopyOk;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
public final class MarshallingCodeCFactory {
/**
* 建立Jboss Marshalling解碼器MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通過Marshalling工具類的getProvidedMarshallerFactory靜態方法獲取MarshallerFactory例項
//引數“serial”表示建立的是Java序列化工廠物件,它由jboss-marshalling-serial-1.3.0.CR9.jar提供。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//建立了MarshallingConfiguration物件
final MarshallingConfiguration configuration = new MarshallingConfiguration();
//將它的版本號設定為5
configuration.setVersion(5);
//然後根據MarshallerFactory和MarshallingConfiguration建立UnmarshallerProvider例項
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//最後通過建構函式建立Netty的MarshallingDecoder物件
//它有兩個引數,分別是UnmarshallerProvider和單個訊息序列化後的最大長度。
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 建立Jboss Marshalling編碼器MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//建立MarshallerProvider物件,它用於建立Netty提供的MarshallingEncoder例項
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//MarshallingEncoder用於將實現序列化介面的POJO物件序列化為二進位制陣列。
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
請求資料載體類:SubscribeReq.java
package com.nettyCopyOk;
import java.io.Serializable;
public class SubscribeReq implements Serializable {
/**
* 預設的序列號ID
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private String userName;
private String productName;
private String phoneNumber;
private String address;
public static long getSerialVersionUID() {
return serialVersionUID;
}
public int getSubReqID() {
return subReqID;
}
public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "SubscribeReq [subReqID=" + subReqID + ", userName=" + userName
+ ", productName=" + productName + ", phoneNumber="
+ phoneNumber + ", address=" + address + "]";
}
}
響應資料載體類:SubscribeResp.java
package com.nettyCopyOk;
import java.io.Serializable;
public class SubscribeResp implements Serializable {
/**
* 預設序列ID
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private int respCode;
private String desc;
public static long getSerialVersionUID() {
return serialVersionUID;
}
public int getSubReqID() {
return subReqID;
}
public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public int getRespCode() {
return respCode;
}
public void setRespCode(int respCode) {
this.respCode = respCode;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "SubscribeResp [subReqID=" + subReqID + ", respCode=" + respCode
+ ", desc=" + desc + "]";
}
}
服務端實際處理邏輯:SubReqServerHandler.java
package com.nettyCopyOk;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ChannelHandler.Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//經過解碼器handler ObjectDecoder的解碼,
//SubReqServerHandler接收到的請求訊息已經被自動解碼為SubscribeReq物件,可以直接使用。
SubscribeReq req = (SubscribeReq) msg;
if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscribe req : ["
+ req.toString() + "]");
//對訂購者的使用者名稱進行合法性校驗,校驗通過後列印訂購請求訊息,構造訂購成功應答訊息立即傳送給客戶端。
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeResp resp(int subReqID) {
SubscribeResp resp = new SubscribeResp();
resp.setSubReqID(subReqID);
resp.setRespCode(0);
resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return resp;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
System.out.println("發生異常");
ctx.close();// 發生異常,關閉鏈路
}
}
客戶端實際處理邏輯:SubReqClientHandler.java
package com.nettyCopyOk;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqClientHandler extends ChannelHandlerAdapter {
public SubReqClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//由於物件解碼器已經對訂購請求應答訊息進行了自動解碼,
//因此,SubReqClientHandler接收到的訊息已經是解碼成功後的訂購應答訊息。
System.out.println("Receive server response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
服務端程式碼:server.java
package com.nettyCopyOk;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
/**
* Created by BaiTianShi on 2018/9/12.
*/
public class Server {
public void bind(int port) throws Exception {
// 配置服務端的NIO執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(Channel ch) {
//通過工廠類建立MarshallingEncoder解碼器,並新增到ChannelPipeline.
ch.pipeline().addLast(com.testMarshaLing.MarshallingCodeCFactory.buildMarshallingDecoder());
//通過工廠類建立MarshallingEncoder編碼器,並新增到ChannelPipeline中。
ch.pipeline().addLast(com.testMarshaLing.MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 繫結埠,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服務端監聽埠關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放執行緒池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
new Server().bind(port);
}
}
客戶端程式碼SubReqClient.java
package com.nettyCopyOk;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
public class SubReqClient {
private static class SingletonHolder {
static final SubReqClient instance = new SubReqClient();
}
public static SubReqClient getInstance(){
return SingletonHolder.instance;
}
private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;
private SubReqClient(){
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(Channel ch)
throws Exception {
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ReadTimeoutHandler(5));
ch.pipeline().addLast(new SubReqClientHandler());
}
});
}
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(Channel ch)
throws Exception {
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new SubReqClientHandler());
}
});
// 發起非同步連線操作
ChannelFuture f = b.connect(host, port).sync();
System.out.println("向服務端傳送請求資料");
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
public void connect(){
try {
this.cf = b.connect( "127.0.0.1", 8080).sync();
System.out.println("遠端伺服器已經連線, 可以進行資料交換..");
} catch (Exception e) {
e.printStackTrace();
}
}
public ChannelFuture getChannelFuture(){
if(this.cf == null){
this.connect();
}
if(!this.cf.channel().isActive()){
this.connect();
}
return this.cf;
}
public static void main(String[] args) throws Exception {
final SubReqClient c = SubReqClient.getInstance();
ChannelFuture cf = c.getChannelFuture();
for (int i = 0; i < 3; i++) {
SubscribeReq req = new SubscribeReq();
req.setAddress("南京市江寧區方山國家地質公園");
req.setPhoneNumber("138xxxxxxxxx");
req.setProductName("Netty For Marshalling");
req.setSubReqID(i);
req.setUserName("Lilinfeng");
cf.channel().writeAndFlush(req);
// TimeUnit.SECONDS.sleep(3);
}
// cf.channel().flush();
cf.channel().closeFuture().sync();
TimeUnit.SECONDS.sleep(4);
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen());
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入子執行緒...");
ChannelFuture cf = c.getChannelFuture();
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen());
//再次傳送資料
SubscribeReq req = new SubscribeReq();
req.setAddress("南京市江寧區方山國家地質公園");
req.setPhoneNumber("138xxxxxxxxx");
req.setProductName("Netty For Marshalling");
req.setSubReqID(10);
req.setUserName("Lilinfeng");
cf.channel().writeAndFlush(req);
cf.channel().closeFuture().sync();
System.out.println("子執行緒結束.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("斷開連線,主執行緒結束..");
}
}
udp通訊和壓縮傳輸和解壓等可看下面的例子
udp通訊:http://blog.csdn.net/mffandxx/article/details/53264172
Netty實現Websocket開發:http://www.cnblogs.com/wunaozai/p/5240006.html
netty實現壓縮和解壓傳輸檔案:https://blog.csdn.net/zbw18297786698/article/details/53678133