Netty原始碼閱讀
Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。本文講會對Netty服務啟動的過程進行分析,主要關注啟動的呼叫過程,從這裡面進一步理解Netty的執行緒模型,以及Reactor模式。
這是我畫的一個Netty啟動過程中使用到的主要的類的概要類圖,當然是用到的類比這個多得多,而且我也忽略了各個類的繼承關係,關於各個類的細節,可能以後會寫單獨的部落格進行分析。在這裡主要注意那麼幾個地方:
1. ChannelPromise關聯了Channel和Executor,當然channel中也會有EventLoop的例項。 2. 每個channel有自己的pipeline例項。 3. 每個NioEventLoop中有自己的Executor例項和Selector例項。
網路請求在NioEventLoop中進行處理,當然accept事件也是如此,它會把接收到的channel註冊到一個EventLoop的selector中,以後這個channel的所有請求都由所註冊的EventLoop進行處理,這也是Netty用來處理競態關係的機制,即一個channel的所有請求都在一個執行緒中進行處理,也就不會存在跨執行緒的衝突,因為這些呼叫都執行緒隔離了。
下面我們先看一段Netty原始碼裡面帶的example程式碼,直觀感受一下Netty的使用:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG , 100) // 設定tcp協議的請求等待佇列
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
首先我們先來了解Netty的主要類:
EventLoop 這個相當於一個處理執行緒,是Netty接收請求和處理IO請求的執行緒。
EventLoopGroup 可以理解為將多個EventLoop進行分組管理的一個類,是EventLoop的一個組。
ServerBootstrap 從命名上看就可以知道,這是一個對服務端做配置和啟動的類。
ChannelPipeline 這是Netty處理請求的責任鏈,這是一個ChannelHandler的連結串列,而ChannelHandler就是用來處理網路請求的內容的。
ChannelHandler 用來處理網路請求內容,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannlPipeline會從頭到尾順序呼叫ChannelInboundHandler處理網路請求內容,從尾到頭呼叫ChannelOutboundHandler處理網路請求內容。這也是Netty用來靈活處理網路請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網路協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。
現在看上面的程式碼,首先建立了兩個EventLoopGroup物件,作為group設定到ServerBootstrap中,然後設定Handler和ChildHandler,最後呼叫bind()方法啟動服務。下面按照Bootstrap啟動順序來看程式碼。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
首先是設定EverLoopGroup,parentGroup一般用來接收accpt請求,childGroup用來處理各個連線的請求。不過根據開發的不同需求也可以用同一個group同時作為parentGroup和childGroup同時處理accpt請求和其他io請求。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
接下來的channel()方法設定了ServerBootstrap的ChannelFactory,這裡傳入的引數是NioServerSocketChannel.class,也就是說這個ReflectiveChannelFactory建立的就是NioServerSocketChannel的例項。
後面的option(),handler()和childHandler()分別是設定Socket連線的引數,設定parentGroup的Handler,設定childGroup的Handler。childHandler()傳入的ChannelInitializer實現了一個initChannel方法,用於初始化Channel的pipeline,以處理請求內容。
之前都是在對ServerBootstrap做設定,接下來的ServerBootstrap.bind()才是啟動的重頭戲。我們繼續按照呼叫順序往下看。
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
// AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
我們可以看到bind()的呼叫最終呼叫到了doBind(final SocketAddress),在這裡我們看到先呼叫了initAndRegister()方法進行初始化和register操作。瞭解JavaNIO框架的同學應該能看出來是在這個方法中將channel註冊到selector中的。最後程式再呼叫了doBind0()方法進行繫結,先按照順序看initAndRegister方法做了什麼操作。
// AbstractBootstrap
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// ...
}
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}
為了簡單其間,我忽略了處理異常分支的程式碼,同學們有興趣可以自行下載Netty原始碼對照。在這裡終於看到channel的建立了,呼叫的是ServerBootstrap的channelFactory,之前的程式碼我們也看到了這裡的工廠是一個ReflectChannelFactory,在建構函式中傳入的是NioServerSocketChannel.class,所以這裡建立的是一個NioServerSocketChannel的物件。接下來init(channel)對channel進行初始化。
// ServerBootstrap
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
// 設定channel.attr
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
// childGroup的handler
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 給channelpipeline新增handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// group的handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
先是設定了channel的option和attr,然後將handler加入到channelpipleline的handler鏈中,這裡大家請特別注意ServerBootstrapAcceptor這個Handler,因為接下來對於客戶端請求的處理以及工作channl的註冊可全是這個Handler處理的。不過由於現在channel還沒有註冊,所以還不會呼叫initChannel()方法,而是將這個handler對應的context加入到一個任務佇列中,等到channel註冊成功了再執行。關於ChannelPipeline的內容我們以後再說。然後在initAndRegister()方法中呼叫config().group().register(channel)對channel進行註冊。config().group()獲取到的其實就是bossGroup,在這個例子中就是一個NioEventLoopGroup,由於它繼承了MultithreadEventLoopGroup所以這裡呼叫的其實是這個類的方法。
// MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public EventLoop next() {
return (EventLoop) super.next();
}
// SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
這裡會獲取EventLoopGroup中的一個EventLoop,其實我們用的是NioEventLoopGroup所以這裡獲取到的其實是NioEventLoop,而NioEventLoop繼承了SingleThreadEventLoop,這裡register方法呼叫的就是SingleThreadEventLoop中的方法。我們重遇來到了channel最終註冊的地方,這裡其實是呼叫了channel的unsafe物件中的register方法,也就是NioServerSocketChannel的方法,這個方法是在AbstractChannel祖先類中實現的,程式碼如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(
相關推薦
Netty 原始碼閱讀之初始環境搭建
推薦 netty 系列原始碼解析合集
http://www.iocoder.cn/Netty/Netty-collection/?aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3R6c18xMDQxMjE4MTI5L2FydGljbGUvZGV0YWlscy83OD
netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件
在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入:
boolean scavenge() {
// con
netty原始碼閱讀之解碼值基於固定長度解碼器分析
固定長度解碼器FixedLengthFrameDecoder比較簡單,我們看下它類的註釋:
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes.
netty原始碼閱讀之解碼之基於長度域解碼器引數分析
這篇文章我們放鬆一點,只分析基於長度域解碼器的幾個引數,
lengthFieldOffset :長度域的偏移量,也就是長度域要從什麼地方開始
lengthFieldLength:長度域的長度,也就是長度域佔多少個位元組
lengthAdjustment:長度域的值的調整
netty原始碼閱讀之解碼之基於長度域解碼器分析
基於長度域解碼器LengthFieldBasedFrameDecoder我們主要分析以下三點:
1、計算需要抽取的資料包的長度
2、跳過位元組邏輯處理
3、丟棄模式下的處理
首先原始碼還是LengthFieldBasedFrameDecoder的decode方法:
netty原始碼閱讀之編碼之MessageToByteEncoder
MessageToByteEncoder的write過程,我們分析以下幾步:
1、匹配物件
2、分配記憶體
3、編碼實現
4、釋放物件
5、傳播資料
6、釋放記憶體
原始碼在這裡:
@Override
public void write(Cha
netty原始碼閱讀之效能優化工具類之FastThreadLocal的使用
先說明FastThreadLocal使用的效果。
1、比jdk原生的ThreadLocal的快
2、不同執行緒之間能保證執行緒安全
這是我們的使用者程式碼:
public class FastThreadLocalTest {
private static F
netty原始碼閱讀之效能優化工具類之FastThreadLocal的建立
建立的話我們直接從FastThreadLocal的構造方法進入:
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
可見他是現
netty原始碼閱讀之效能優化工具類之Recycler獲取物件
Recycler獲取物件主要分為以下幾部分:
1、獲取當前執行緒的Stack
2、從Stack裡面彈出物件
3、如果彈出物件為空,那就建立物件並且繫結到Stack裡面
我們從Recycler的get方法進入,就是這個原始碼:
@SuppressWarnin
netty原始碼閱讀與分析----HashedWheelTimer
netty是一個基礎通訊框架,管理的連線數較多,可能多至百萬級,每一個連線都有或多或少有超時任務,比如傳送資料超時,心跳檢測等。如果為每一個連線都啟動一個Timer,不僅效率低下,而且佔用資源。基於論文Hashed and hierarchical timing wheels
Netty 原始碼閱讀 —— 服務端建立
之前專案中用過netty,這次趁著面試空閒時間,重新梳理一遍netty原始碼,從服務端建立開始,話不多說,直接上程式碼先看看netty服務端建立的整體程式碼,大概如下所示:public void bind(int port) throws Exception {
Ev
netty原始碼閱讀之NioEventLoop之NioEventLoop執行-----runAllTask
processSelectedKey()和runAllTask()
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
Netty原始碼閱讀
Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。本文講會對Netty服務啟動的過程進行分析,主要關注啟動的呼叫過程,從這裡面進一步理解Netty的執行緒模型
Netty原始碼閱讀之如何將TCP的讀寫操作和指定執行緒繫結
**原文連結**:[http://xueliang.org/article/detail/20200712234015993](http://xueliang.org/article/detail/20200712234015993)
# 前言
在Netty的執行緒模型中,對於一個TCP連線的讀寫操作,都是
01 storm 原始碼閱讀 storm的程序間訊息通訊實現netty server實現
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* dist
02 storm 原始碼閱讀 storm的程序間訊息通訊實現netty client實現
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* dist
Mac下一款不錯的原始碼閱讀軟體
1、支援多語言:Ada, C, C++, C#, Java, FORTRAN, Delphi, Jovial, and PL/M ,混合語言的project也支援
2、多平臺: Windows/Linux/Solaris/HP-UX/IRIX/MAC OS X
3、程式碼語法高亮、程式碼折迭
Memcache-Java-Client-Release原始碼閱讀(之七)
一、主要內容 本章節的主要內容是介紹Memcache Client的Native,Old_Compat,New_Compat三個Hash演算法的應用及實現。
二、準備工作 1、伺服器啟動192.168.0.106:11211,192.168.0.106:11212兩個服務端例項。
Memcache-Java-Client-Release原始碼閱讀(之六)
一、主要內容 本章節的主要內容是介紹Memcache Client的一致性Hash演算法的應用及實現。
二、準備工作 1、伺服器啟動192.168.0.106:11211,192.168.0.106:11212兩個服務端例項。 2、示例程式碼:
String[] serve
Netty 原始碼分析之拆包器的奧祕
為什麼要粘包拆包
為什麼要粘包
首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資