Day15.高效能RPC設計 學習筆記3 - Netty
一、Netty引言
基於NIO一款非同步通訊框架,因為在使用上相比較Mina
較為簡單,開發門檻低導致了Netty在網際網路開發中受到絕大多數商用專案成功驗證,導致了Netty成為NIO開發的首選框架。
“快速”和“簡單”並不用產生維護性或效能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括 FTP,SMTP,HTTP,各種二進位制,文字協議,並經過相當精心設計的專案,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的效能,穩定性和伸縮性。
在設計上: 針對多種傳輸型別的統一介面 - 阻塞和非阻塞; 簡單但更強大的執行緒模型;真正的無連線的資料報套接字支援;連結邏輯支援複用;
在效能上:比核心 Java API 更好的吞吐量,較低的延時;資源消耗更少,這個得益於共享池和重用;減少記憶體拷貝
在健壯性上:消除由於慢,快,或過載連線產生的 OutOfMemoryError;消除經常發現在 NIO 在高速網路中的應用中的不公平的讀/寫比
在安全上:完整的 SSL / TLS 和 StartTLS 的支援且已得到大量商業應用的真實驗證, 如: Hadoop 專案的 Avro (RPC 框架)、 Dubbo、 Dubbox等RPC框架
二、例項
- 匯入依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
- Netty的服務端開發
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//①、啟動服務引導 框架啟動引導類,遮蔽網路通訊配置資訊
ServerBootstrap sbt= new ServerBootstrap();
//②、建立請求轉發,響應執行緒池
NioEventLoopGroup master = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
//③、關聯執行緒池組
sbt.group(master,worker) ;
//④、設定服務端實現類
sbt.channel(NioServerSocketChannel.class);
//⑤、配置服務端RPC通訊管道 【關注點】
sbt.childHandler(new ServerChannelInitializer());
//⑥、設定伺服器的埠並且啟動服務
System.out.println("我在9999監聽...");
ChannelFuture f = sbt.bind(9999).sync();
//⑦、關閉通訊管道
f.channel().closeFuture().sync();
//⑧、釋放資源
master.shutdownGracefully();
worker.shutdownGracefully();
}
}
- Netty的客戶端開發(Server端改動下)
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//①、啟動服務引導 框架啟動引導類,遮蔽網路通訊配置資訊
Bootstrap bt= new Bootstrap();
//②、建立請求轉發,響應執行緒池
NioEventLoopGroup worker = new NioEventLoopGroup();
//③、關聯執行緒池組
bt.group(worker);
//④、設定服務端實現類
bt.channel(NioSocketChannel.class);
//⑤、配置客戶端RPC通訊管道 【關注點】
bt.handler(new ClientChannelInitializer());
//⑥、設定伺服器的埠並且啟動服務
ChannelFuture f = bt.connect("127.0.0.1",9999).sync();
//⑦、關閉通訊管道
f.channel().closeFuture().sync();
//⑧、釋放資源
worker.shutdownGracefully();
}
}
- 總結下API記憶技巧:
第一步、建立服務引導(服務端ServerBootstrap與客戶端Bootstrap);
第二步、確定網路程式設計執行緒(服務端兩個,客戶端一個);
第三步、關聯起來;
第四步、實現類(服務端NioServerSocketChannel和客戶端NioSocketChannel);
第五步、配置自定義的RPC通訊管道;
第六步、服務端啟動服務,客戶端多設定ip啟動服務;
第七步、關閉通訊管道
第八步、釋放資源
三、Netty的管道淺析 —— “學Netty,就是學管道”
- 解釋:
管道:訊息交換的介質
環:加功能、不同環只處理固定方向的stream
紅色:輸入upstream、編碼器、將物件轉成位元組流、先編碼,再編幀
橙色:輸出downstream、解碼器、將位元組流轉成物件、先解幀,再解碼
編碼 - 將物件變成位元組、編幀 - 切分物件,方便解幀、解幀 - 固定規則拆分、解碼 - 位元組變成物件
管道末端:掛載最終處理者,作用收發物件
管道前端:掛載編解碼器
所以最終關注的,需要書寫的是,最終處理者和編解碼器
四、開發管道
- 初始化管道末端
xxxChannelInitializer
- ServerChannelInitializer
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//通訊管道
ChannelPipeline pipeline = ch.pipeline();
//掛載最終處理者
pipeline.addLast(new ServerChannelHandlerAdapter());
}
}
- ClientChannelInitializer
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//通訊管道
ChannelPipeline pipeline = ch.pipeline();
//掛載最終處理者
pipeline.addLast(new ClientChannelHandlerAdapter());
}
}
- 掛載最終處理者
xxxChannelHandlerAdapter
Server:
channelRead:被動接收發訊息
exceptionCaught:異常捕獲
Client:
channelActive:傳送訊息
channelRead:接收訊息
exceptionCaught:異常捕獲
- ServerChannelHandlerAdapter
建立服務端最終處理者
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 收發訊息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
//關閉通道
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
/**
* 異常捕獲
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
}
}
- ClientChannelHandlerAdapter
建立客戶端最終處理者
public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 主動傳送訊息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String msg="你好,我伺服器!!!!";
ByteBuf buf=ctx.alloc().buffer();
buf.writeBytes(msg.getBytes());
ctx.writeAndFlush(buf);
}
/**
* 被動收發訊息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf= (ByteBuf) msg;
System.out.println("收到:"+buf.toString(CharsetUtil.UTF_8));
}
/**
* 異常捕獲
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
}
}
五、位元組流ByteBuf基本使用
- ByteBuf
ByteBuf
區別於ByteBuffer
不同。因為ByteBuf是一個可邊長的位元組陣列是Netty提供專門用於操作位元組工具類。
該物件通過readIndex
/writeIndex
控制ByteBuf。在初始情況下readIndex
和writeIndex
都等於0;當用戶寫資料的時候writeIndex
自動增長位元組數,可讀的區域就等於writeIndex-readIndex
。
/**
* 或者其他建立方式 new PooledByteBufAllocator().buffer();
* new UnpooledByteBufAllocator(true).buffer();
*/
ByteBuf buf= Unpooled.buffer(5);// r=0,w=0,c=5(可以自動擴容)
buf.writeByte((byte)'a');
buf.writeByte((byte)'b');
buf.writeByte((byte)'c'); // r=0,w=3,c=5
byte v1 = buf.readByte();
byte v2 = buf.readByte(); // r=2,w=3,c=5
//readableBytes() 剩餘可讀位元組
int i = buf.readableBytes();// w-r=1
//discardReadBytes() 清除已讀過資料,即修正readIndex和writeIndex
buf.discardReadBytes(); // w-=r,r=0 ,c=5
相比於 ByteBuffer操作簡單些。
注意ByteBuf
是Netty提供緩衝區,不能夠給NIO使用,僅僅簡化對緩衝區的操作。省略flip/clear
方法。Netty管道預設只支援傳送ByteBuf
和FileRegion
都代表位元組資料,一般在RPC系統使用ByteBuf
傳送基礎資料(java物件);如果使用者使用Netty開發檔案傳輸系統。使用者可以直接將一個檔案拆分成多個FileRegion
物件.
- FileRegion
將一個檔案切分成多個區間,實現對檔案隨機讀取。(可以實現斷點續傳,並行拷貝)
FileChannel fi=new FileInputStream("xxx檔案A").getChannel();
FileChannel fo=new FileOutputStream("xxx檔案B",true).getChannel();//SocketChannel
//讀取xxx檔案A從0位置開始讀,讀取100個位元組
FileRegion region= new DefaultFileRegion(fi,0,100);
//從當前Region的0位置,拷貝100個位元組資料到Xxx檔案B中
region.transferTo(fo,0);
FileRegion
在網路間傳遞檔案系統。綜上所述Netty的通道只支援傳送以上兩種物件型別,如果使用者希望傳送自定義型別就必須嘗試將傳送的資料轉換為ByteBuf
或者FileRegion
.
六、如何捕獲Netty在傳輸過程中出現的異常資訊?【面試題】
//傳送訊息
ChannelFuture channelFuture = ctx.writeAndFlush(cmd);
//新增序列化異常捕獲
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
//出錯切斷連線
channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
物件編碼器,作用於輸出,使得Netty支援傳輸任意Java物件。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import org.apache.commons.lang3.SerializationUtils; import java.io.Serializable; import java.util.List; public class ObjectEncoder extends MessageToMessageEncoder<Object> { /** * * @param ctx * @param msg 需要編碼的物件 * @param out 編碼資料幀 * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { ByteBuf buf=ctx.alloc().buffer(); byte[] bytes= SerializationUtils.serialize((Serializable) msg); buf.writeBytes(bytes); out.add(buf);//生成資料幀 } }
物件解碼,將任意的收到ByteBuf轉換為Object
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.apache.commons.lang3.SerializationUtils; import java.util.List; public class ObjectDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { byte[] bytes=new byte[msg.readableBytes()]; msg.readBytes(bytes); Object obj = SerializationUtils.deserialize(bytes); out.add(obj); } }
七、完整版本Netty服務端【重點】
- HostAndPort
封裝ip和埠
public class HostAndPort {
private String host;
private int port;
public HostAndPort(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
- MethodInvokeMeta
測試用方法物件
public class MethodInvokeMeta implements Serializable {
private String method;//方法名
private Class<?>[] parameterTypes;//引數型別
private Object[] agrs;//引數
private String targetClass;//目標類資訊
public MethodInvokeMeta(String method, Class<?>[] parameterTypes, Object[] agrs, String targetClass) {
this.method = method;
this.parameterTypes = parameterTypes;
this.agrs = agrs;
this.targetClass = targetClass;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getAgrs() {
return agrs;
}
public void setAgrs(Object[] agrs) {
this.agrs = agrs;
}
public String getTargetClass() {
return targetClass;
}
public void setTargetClass(String targetClass) {
this.targetClass = targetClass;
}
@Override
public String toString() {
return "MethodInvokeMeta{" +
"method='" + method + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", agrs=" + Arrays.toString(agrs) +
", targetClass='" + targetClass + '\'' +
'}';
}
}
- NettyServer
簡潔方式NettyServer
幀頭圖圖
public class NettyServer {
//1.啟動服務引導 框架啟動引導類,遮蔽網路通訊配置資訊
private ServerBootstrap sbt = new ServerBootstrap();
//2.建立請求轉發、響應執行緒池
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup worker = new NioEventLoopGroup();
private int port;
public NettyServer(int port) {
this.port = port;
//3.關聯執行緒池組
sbt.group(boss, worker);
//4.設定服務端實現類
sbt.channel(NioServerSocketChannel.class);
}
public static void main(String[] args) throws InterruptedException {
NettyServer nettyServer = new NettyServer(9999);
try {
nettyServer.start();
} finally {
nettyServer.close();
}
}
public void start() throws InterruptedException {
//5.配置服務端RPC通訊管道 - 需要關注的東西
sbt.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//新增資料幀解碼
/**
* @param maxFrameLength 幀的最大長度 '65535'
* @param lengthFieldOffset length欄位偏移的地址 '0'
* @param lengthFieldLength length欄位所佔的位元組長 '2'
* @param lengthAdjustment 修改幀資料長度欄位中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推後多少個欄位 '0'
* @param initialBytesToStrip 解析時候跳過多少個長度 '2'
* @param failFast 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異 '有不寫這個引數的構造'
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
//新增物件解碼器
pipeline.addLast(new ObjectDecoder());
//新增資料幀編碼器
/**
* 兩個位元組長度的幀頭協議
*/
pipeline.addLast(new LengthFieldPrepender(2));
//新增物件編碼器
pipeline.addLast(new ObjectEncoder());
//新增最終處理者
pipeline.addLast(new ChannelHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("伺服器收到:" + msg);
//收訊息 ChannelFuture非同步操作的結果
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
//關閉通道,ps測試多物件時關閉
//channelFuture.addListener(ChannelFutureListener.CLOSE);
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
相關推薦
Day15.高效能RPC設計 學習筆記3 - Netty
一、Netty引言
基於NIO一款非同步通訊框架,因為在使用上相比較Mina較為簡單,開發門檻低導致了Netty在網際網路開發中受到絕大多數商用專案成功驗證,導致了Netty成為NIO開發的首選框架。
“快速”和“簡單”並不用產生維護性或效能上的問題。Netty 是一個吸收了多種協議
Day16.高效能RPC設計 學習筆記4 - Zookeeper(轉載)
Zookeeper
ZooKeeper 是一個為分散式應用所設計的分佈的、開源的協調服務。可以解決分散式應用中出現常規問題:
同步配置、選舉、分散式鎖、服務命名分組,記住這些問題雖然zookeeper可以幫助使用者解決,並不意味著使用者不需要寫程式碼。使用者如果想使用zookeepe
Day14.高效能RPC設計 學習筆記2
一、通道選擇器
通道註冊:需要使用Selector管理通道,然後將就緒的通道封裝成SelectionKey物件。
設定通道為非阻塞 ServerSocketChannel/SocketChannel#configureBlocking(false)
註冊通道Ser
Day13.高效能RPC設計 學習筆記1
一、引言
系統架構演變 隨著網際網路的發展,網站應用的規模不斷擴大,常規的垂直應用(MVC)架構已無法應對,分散式服務架構以及流動計算架構(伸縮性)勢在必行,亟需一個治理系統確保架構有條不紊的演進。
單一架構:例如早期servlet/jsp - ORM(物件關
系統分析與設計--學習筆記3(用例建模)
一.使用故事板(Storyboard)技術,建模網上訂機票、車票,或訂旅店的過程。
故事前提:你是註冊使用者,並已登陸。
故事起點:發起目標查詢
故事終點:確認訂單(不包含支付)
描述手法:參考 “建模練習” 文件
用“藝龍網”的網上訂酒店來模仿建模:(更多資訊在藝龍網
設計模式:學習筆記(3)——命令者式
抽象 ring code 耦合度 引入 聲明 筆記 操作日誌 string Java設計模式之命令者式
引入命令模式
案列
比如我們要設計一個DOS命令模擬器,它可以接受輸入的命令並做出響應。
0.首先我們的DOS模擬器支持三個大類的功能:文件操作類(FileKit
OPENCV學習筆記3-4_使用模型-視圖-控制器設計應用程序
main hold sse model getters core tpi sso data 此節介紹的架構模式(MVC),將聯合使用模型-視圖-控制器(Model-View-Controller)三個模式以及其他的類。其清晰地分離程序中的邏輯部分與用戶交互部分。此節將使
設計模式學習筆記(3) --工廠方法
1.定義:
工廠方法模式(Factory Method Pattern)又稱為工廠模式,也叫虛擬構造器(Virtual Constructor)模式或者多型工廠(Polymorphic Factory)模式,它屬於類建立型模式。在工廠方法模式中,工廠父類負責定義建立產品物件的公共介面,而工廠子類
DirectX 9 UI設計學習筆記之二:第2章Introducing DirectX+第3章Introducing Direct3D
此文由哈利_蜘蛛俠原創,轉載請註明出處!有問題歡迎聯絡本人!
上一期的地址:
別看這一期似乎要講很多內容,其實大部分是一帶而過的。其實我的重點在於弄了一個框架程式;詳情見本期最後。
第2章 Introducing DirectX
===
Hadoop學習筆記—3.Hadoop RPC機制的使用
一、RPC基礎概念
1.1 RPC的基礎概念
RPC,即Remote Procdure Call,中文名:遠端過程呼叫;
(1)它允許一臺計算機程式遠端呼叫另外一臺計算機的子程式,而不用去關心底層的網路通訊細節,對我們來說是透明的。因此,它經常用於分散式網路通訊中。
RPC協議假定某些傳輸
系統分析與設計學習筆記(一)
學習 掌握 應該 溝通 基本 最終 表示 對象 毫無
為什麽要學習這門課程?
“擁有一把錘子未必能成為建築師”。
這門課程學習的是面向對象分析和設計的核心技能的重要工具。對於使用面向對象技術和語言來,創建設計良好、健壯且可維護的軟件來說,這門課程所
Redis學習筆記3-Redis5個可運行程序命令的使用
運行程序 檢查 mil 數據文件 img usr pre text mod
在redis安裝文章中,說到安裝好redis後,在/usr/local/bin下有5個關於redis的可運行程序。以下關於這5個可運行程序命令的具體說明。
redis-server
Redi
Git學習筆記3——工作區與暫存區,以及版本變更
暫存區 所有 cto tag clas 內容 blank 文件 set 工作區(Working Directory)
就是你在電腦裏能看到的目錄,比如我的Git_Learning文件夾就是一個工作區。
版本庫(Repository)
工作區有一個隱藏目錄.git,這個不
Python學習筆記3:簡單文件操作
name n) popu 元素 close nes pla () eof
# -*- coding: cp936 -*-
# 1 打開文件
# open(fileName, mode)
# 參數:fileName文件名稱
# mode打開方式
# w
Linux程序設計學習筆記——異步信號處理機制
基本概念 erro 驗證 添加 uid 函數 count ubun generate
轉載請註明出處:
http://blog.csdn.net/suool/article/details/38453333
Linux常見信號與處理
基本概念
Linux的信號是一
Linux 程序設計學習筆記----Linux下文件類型和屬性管理
腳本 types.h 沒有 oot 創建 jsb 文件 屬性 文件大小
轉載請註明出處:http://blog.csdn.net/suool/article/details/38318225
部分內容整理自網絡,在此感謝各位大神。
Linux文件類型和權限
數據表示
(MYSQL學習筆記3)mysql兩行數據合並成一行
mysql使用SUM函數,加上GROUP BY人員ID就可以實現了:SELECT SUM(PZ+CPJS+BZ+GC+SB+TG+MJ+CL+CCLW+GJ+ZL+CBZZ) as count,
SUM(PZ) as PZ,SUM(CPJS) as CPJS,SUM(BZ) as BZ,SUM(GC)
javascript 高級程序設計學習筆記(1)
元素 新的 logs html light begin 知識 gin nbsp 知識補充:
var box = document.querySelector(‘#box‘);
//"beforebegin" ,在當前元素之前插入一個緊鄰的同輩元素;
box.ins
AngularJs學習筆記3-服務及過濾器
聲明 運行時 維護 style 函數調用 factor blog 使用場景 需要 距離上次別博客有有一段時間了,因為最近公司和個人事情比較多,也因為學習新的知識所以耽擱了,也有人說Angularjs1.5沒有人用了,沒必要分享,我個人感覺既然開頭了我就堅持把他寫完,
java學習筆記(3)
決定 ati 開始 詳細講解 調用 數據 寄存器 art 筆記 java基礎知識
1:方法
(1)方法:就是完成特定功能的代碼塊。
註意:在很多語言裏面有函數的定義,而在Java中,函數被稱為方法。
(2)格式:
修飾符 返回值類型 方法名(參數類型 參數名1