自頂向下深入分析Netty(四)--EventLoop-1
netty執行緒模型
我們再次回顧這幅圖,通過先前的講解,現在是不是親切很多了。圖中綠色的acceptor應該是你最熟悉的部分,之前我們在ServerBootstrap中進行了詳細分析。我們知道了mainReactor是一個執行緒池,處理Accept事件負責接受客戶端的連線;subReactor也是一個執行緒池,處理Read(讀取客戶端通道上的資料)、Write(將資料寫入到客戶端通道上)等事件。在這一節中,我們將深入分析這兩個執行緒池的實現,不斷完善其中的細節。我們首先從類圖開始。
4.1 類圖
EventLoop類圖
看到這幅類圖,如果你的第一印象是氣勢恢巨集,那麼恭喜你,你已經成功了一半。但不難預料的是,大多數人和我的感受是一樣的:這麼多類,一定很累。好在這只是第一印象,我們仔細觀察,便會發現其中明顯的脈絡,兩條線索(這裡使用自下而上):NioEventLoop以及NioEventLoopGroup即執行緒和執行緒池。忽略其中大量的介面,剩餘這樣的兩條線:
NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor -->
AbstractScheduledEventExecutor --> AbstractScheduledEventExecutor -->
AbstractEventExecutor --> AbstractExecutorService
NioEventLoopGroup --> MultithreadEventLoopGroup -->
MultithreadEventExecutorGroup --> AbstractEventExecutorGroup
下面我們正式開始分析,依舊使用自頂向下的方法,從類圖頂部向下、從執行緒池到執行緒分析。
4.2 EventExecutorGroup
EventExecutorGroup在類圖中處於承上啟下的位置,其上是Java原生的介面和類,其下是Netty新建的介面和類,由於它處於如此重要的位置,我們詳細分析其中的方法。
4.2.1 Executor
首先看其繼承自Executor的方法:
// Executes the given command at some time in the future
void execute (Runnable command);
只有一個簡單的execute()方法,但這個方法奠定了java併發的基礎,提供了非同步執行任務的。
4.2.2 ExecutorService
ExecutorService的關鍵方法如下(其中的invoke*方法並非關鍵,不再列出):
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
這些方法我們能從命名中便能知道方法的作用。我們主要看submit()方法,該方法是execute()方法的擴充套件,相較於execute不關心執行結果,submit返回一個非同步執行結果Future。這無疑是很大的進步,但這裡的Future不提供回撥操作,顯得很雞肋,所以Netty將Java原生的java.util.concurrent.Future擴充套件為io.netty.util.concurrent.Future,我們將在之後進行介紹。
4.2.3 ScheduledExecutorService
從名字可以看出,該介面提供了一系列排程方法:
ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
long period,TimeUnit unit);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit);
schedule()方法排程任務使任務在延遲一段時間後執行。scheduleAtFixedRate延遲一段時間後以固定頻率執行任務,scheduleWithFixedDelay延遲一段時間後以固定延時執行任務。是不是有點頭暈?那就對了,這裡有一個例子專門治頭暈。專家建議程式設計師應該每小時工作50分鐘,休息10分鐘,類似這樣:
13:00 - 13:10 休息
13:10 - 14:00 寫程式碼
14:00 - 14:10 休息
14:10 - 15:00 寫程式碼
實現這樣的排程我們可以使用(假設現在時間為13:00):
executor.scheduleAtFixedRate(new RestRunnable(), 0 , 60, TimeUnit.MINUTES);
executor.scheduleWithFixedDelay(new RestRunnable(), 0 , 50, TimeUnit.MINUTES);
4.2.4 EventExecutorGroup
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
EventExecutor next();
EventExecutorGroup擴充套件的方法有5個,前四個可以從命名中推斷出功能。shutdownGracefully()我們已經在Bootstrap一節中使用過,優雅關閉執行緒池;terminationFuture()返回執行緒池終止時的非同步結果。重點關注next()方法,該方法的功能是從執行緒池中選擇一個執行緒。EventExecutorGroup還覆蓋了一些方法,我們不再列出,如果你感興趣可以去原始碼裡面檢視,需要注意的是,覆蓋的方法大部分是將Java原生的java.util.concurrent.Future返回值覆蓋為io.netty.util.concurrent.Future。
4.3 執行緒池
4.3.1 AbstractEventExecutorGroup
AbstractEventExecutorGroup實現了EventExecutorGroup介面的大部分方法,實現都長的和下面的差不多:
@Override
public void execute(Runnable command) {
next().execute(command);
}
從這段程式碼可以看出這個執行緒池和程式設計師有一個相同點:懶。當執行緒池執行一個任務或命令時,步驟是這樣的:(1).找一個執行緒。(2).交給執行緒執行。
4.3.2 MultithreadEventExecutorGroup
MultithreadEventExecutorGroup實現了執行緒的建立和執行緒的選擇,其中的欄位為:
// 執行緒池,陣列形式可知為固定執行緒池
private final EventExecutor[] children;
// 執行緒索引,用於執行緒選擇
private final AtomicInteger childIndex = new AtomicInteger();
// 終止的執行緒個數
private final AtomicInteger terminatedChildren = new AtomicInteger();
// 執行緒池終止時的非同步結果
private final Promise<?> terminationFuture =
new DefaultPromise(GlobalEventExecutor.INSTANCE);
// 執行緒選擇器
private final EventExecutorChooser chooser;
MultithreadEventExecutorGroup的構造方法很長,我們將選出其中的關鍵部分分析,故不列出整體程式碼。如果你是處女座,這裡有一個連結MultithreadEventExecutorGroup。
我們先看構造方法簽名:
protected MultithreadEventExecutorGroup(int nThreads,
ThreadFactory threadFactory, Object... args)
其中的nThreads表示執行緒池的固定執行緒數。
MultithreadEventExecutorGroup初始化的步驟是:
(1).設定執行緒工廠
(2).設定執行緒選擇器
(3).例項化執行緒
(4).設定執行緒終止非同步結果
首先我們看裝置執行緒工廠的程式碼:
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
protected ThreadFactory newDefaultThreadFactory(),() {
return new DefaultThreadFactory(getClass());
}
如果構造引數threadFactory為空則使用預設執行緒池,建立預設執行緒池使用newDefaultThreadFactory(),這是一個protected方法,可以在子類中覆蓋實現。
接著我們看設定執行緒選擇器的程式碼:
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
如果執行緒數是2的冪次方使用2的冪次方選擇器,否則使用通用選擇器。下次如果有面試官問你怎麼判斷一個整數是2的冪次方,請甩給他這一行程式碼:
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
Netty實現了兩個執行緒選擇器,雖然程式碼不一致,功能都是一樣的:每次選擇索引為上一次所選執行緒索引+1的執行緒。如果你沒看明白程式碼的含義,沒關係,再看一遍。
private interface EventExecutorChooser {
EventExecutor next();
}
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
最佳實踐:執行緒池數量使用2的冪次方,這樣執行緒池選擇執行緒時使用位操作,能使效能最高。
下面我們接著分析例項化執行緒的步驟:
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 使用模板方法newChild例項化一個執行緒
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 如果不成功,所有已經例項化的執行緒優雅關閉
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 確保已經例項化的執行緒終止
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
實現的過程一句話描述就是:使用newChild()依次例項化執行緒,如果出錯,關閉所有已經例項化的執行緒。也許你對finally中的程式碼有疑問,這是因為不清楚shutdownGracefully()的含義。你需要提前明白這樣的事實:shutdownGracefully()只是通知執行緒池該關閉,但什麼時候關閉由執行緒池決定,所以需要使用e.isTerminated()來判斷執行緒池是否真正關閉。
例項化執行緒池正常完成後,Netty使用下面的程式碼設定非同步終止結果:
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 執行緒池中的執行緒每終止一個增加記錄數,直到全部終止設定執行緒池非同步終止結果為成功
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
分析完MultithreadEventExecutorGroup的構造方法,我們繼續分析普通方法。它的普通方法基本與下面的isTerminated()類似:
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
總結起來就是:執行緒池的狀態由其中的各個執行緒決定。明白了這點,我們使用類比的方法可以推知其他方法的實現,故不再具體分析。
4.3.3 MultithreadEventLoopGroup
MultithreadEventLoopGroup實現了EventLoopGroup介面的方法,EventLoopGroup介面作為Netty併發的關鍵介面,我們看其中擴充套件的方法:
// 將通道channel註冊到EventLoopGroup中的一個執行緒上
ChannelFuture register(Channel channel);
// 返回的ChannelFuture為傳入的ChannelPromise
ChannelFuture register(Channel channel, ChannelPromise promise);
// 覆蓋父類介面的方法,返回EventLoop
@Override EventLoop next();
這些方法在MultithreadEventLoopGroup的具體實現很簡單。register()方法選擇一個執行緒,該執行緒負責具體的register()實現。next()方法使用父類實現,即使用上一節所述的選擇器選擇一個執行緒。程式碼如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
分析完這些程式碼,我們關注一下執行緒數的預設設定。
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
預設情況,執行緒數最小為1,如果配置了系統引數io.netty.eventLoopThreads
,設定為該系統引數值,否則設定為核心數的2倍。
4.3.4 NioEventLoopGroup
NioEventLoopGroup的主要程式碼實現是模板方法newChild(),用來建立執行緒池中的單個執行緒,程式碼如下:
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args)
throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(),
(RejectedExecutionHandler) args[2]);
}
關於程式碼中的引數含義,我們放在NioEventLoop中分析。此外NioEventLoopGroup還提供了setIoRatio()和rebuildSelectors()兩個方法,一個用來設定I/O任務和非I/O任務的執行時間比,一個用來重建執行緒中的selector來規避JDK的epoll 100% CPU Bug。其實現也是依次設定各執行緒的狀態,故不再列出。
相關推薦
自頂向下深入分析Netty(四)--EventLoop-1
netty執行緒模型 我們再次回顧這幅圖,通過先前的講解,現在是不是親切很多了。圖中綠色的acceptor應該是你最熟悉的部分,之前我們在ServerBootstrap中進行了詳細分析。我們知道了mainReactor是一個執行緒池,處理Accept事件負責接受客戶
自頂向下深入分析Netty(三)--Bootstrap
1.使用示例 首先使用Netty構造如圖所示的框架,原始碼如下: // 指定mainReactor EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 指定subReactor EventLoopGr
自頂向下深入分析Netty(二)--執行緒模型
上面這幅圖描述了netty的執行緒模型,其中mainReacotor,subReactor,Thread Pool是三個執行緒池。mainReactor負責處理客戶端的連線請求,並將accept的連線註冊到subReactor的其中一個執行緒上;subReactor負責處理客戶端通道上的資料讀
自頂向下深入分析Netty(一)--預備知識
netty是基於Java NIO封裝的網路通訊框架,只有充分理解了Java NIO才能理解好netty的底層設計。Java NIO有幾個重要的概念Channel,Buffer,Selector。NIO是基於Channel和Buffer操作的,資料只能通過Buffer寫入到Channel或者從Chan
第四章——自頂向下語法分析方法
一、語法分析器的功能 語法分析器的任務:判斷所給單詞串是不是給定文法的正確句子。 1、確定的自頂向下分析思想: 從文法的識別符號出發、根據當前的輸入符號、唯一的確定一個產生式、用產生式右部的符號串代替相應的非終結符往下推。能構造成功則是句子,否則不是。 2、什麼
計算機網路自頂向下 :應用層(FTP、SMTP、POP3、IMAP)
題外話,最近補習數學課還是很有成效的。 關於應用層是分三次寫的,因此寫的重點是在HTTP協議和DNS協議分析上。而應用層中FTP、SMTP、POP3、IMAP,計算機網路自頂向下中並沒有太詳細的介紹。我也試著看RFC然後抓包分析一下過程,採用的抓包方法是FTP
計算機網路自頂向下 :應用層(DNS,POP)
DNS是: 一個分散式DNS伺服器實現分佈或資料庫。 一個使得主機能夠查詢分佈資料庫的應用層協議。 DNS伺服器通常是執行BIND軟體的Linux機器。 DNS協議執行在UDP之上使用53埠。 DNS提供的服務: DNS除了進行主機到IP地址的
文法分析小結:自底向上的分析方法和自頂向下的分析方法有哪些
首先注意一點:無論是那種語法分析,語法都是從左至右的讀入符號! 自底向上分析法,也稱移進-歸約分析法。 它的實現思想是對輸入符號串自左向右進行掃描,並將輸入符逐個移入一個後進先出棧中,邊移入邊分析,一旦棧頂符號串形成某個句型的控制代碼時,(該控制代碼對應某產生式的右部),就
自頂向下分析一個簡單的語音識別系統(六)
上回分析了run_model函式的configuration過程,其中load_placeholder_into_network函式用於構建該語音識別系統中RNN網路的基本結構,本回將分析以下該網路。 1.RNN簡介 人們並不是從每秒鐘他接收到的資訊開始
自頂向下分析一個簡單的語音識別系統(十)
接著上回結束的地方,本回我們來分析sparse_tuple_to_texts函式和ndarray_to_text函式。首先分析sparse_tuple_to_texts函式。 1.sparse_tuple_to_texts函式 給出程式碼如下: def s
自頂向下分析一個簡單的語音識別系統(八)
上回我們說到了get_audio_and_transcript函式、pad_sequences函式和sparse_tuple_from函式等3個函式,本回我們分析這3個函式分別實現了哪些功能。 1.get_audio_and_transcript函式 該
自頂向下分析一個簡單的語音識別系統(五)
本回我們主要分析run_model中的configuration過程的相關函式。 1.run_model函式 第二回我們簡單介紹了run_model函式的結構,現在我們貼出程式碼如下所示: def run_model(self):
自頂向下分析一個簡單的語音識別系統(四)
上回我們分析了模型的初始化,花開兩朵各表一枝,本回我們說一下上回提到的set_dirs.py。該檔案結構如下圖所示: Created with Raphaël 2.1.0get_home_dirget_data_dir/get_conf_dir/get_mod
自頂向下分析一個簡單的語音識別系統(一)
RNN處理帶有時間序列的資料時具有很大的優勢,接下來幾篇文章將介紹如何使用RNN訓練一個簡單的語音識別系統。 主要參考該GitHub專案,https://github.com/silicon-vall
計算機網絡自頂向下方法——可靠數據傳輸原理1(構造可靠數據傳輸協議)
需要 足夠 方向 信息 不發送 可靠的 更多 定時器 基於 TCP向調用它的因特網應用提供所提供的服務模型 數據可以通過一條可靠的信道進行傳輸。借助於可靠的信道,傳輸比特就不會受到損壞或丟失,而且所有數據都是按其發送順序進行交付。 可靠傳輸協議 實現服務模型就需要可靠
【ACM】UVa 489 劊子手遊戲(自頂向下)
【題目】 Hangman Judge是一個猜英文單字的小遊戲(在電子字典中常會看到),遊戲規則如下: 1、答案單字寫在紙上(每個字元一張紙),並且被蓋起來,玩家每次猜一個英文字元(letter)。 2、如果這個英文字元猜中(在答案的英文單字中有出現),被猜中的字元就被翻
計算機網路:自頂向下方法(第七版)Wireshark實驗指南
計算機網路:自頂向下方法(第七版)Wireshark實驗指南 這個資源在網上好像很難找,我歷經千辛萬苦找到之後,在文件的內部發現這些文件其實是免費公開在一個網站上的……,連結如下: http://gaia.cs.umass.edu/wireshark-labs/?tdsourcetag=s_pcqq_ai
深入 自頂向下 逐步求精 面向過程程式設計方法
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!  
筆記 -《計算機網路:自頂向下方法》 第5章 鏈路層:鏈路、接入網和區域網(0)
第5章 鏈路層:鏈路、接入網和區域網(0) ** “結構” 均為本章知識結構; ** “假設” 均為理想化,抽象的模型; ** “例項” 均為已經投入使用的模型; (結構1) (假設1)同一子網內 傳遞網路層資料報的鏈路層工作流程 &nbs
深入 自頂向下 逐步求精 面向過程程式設計方法
程式設計初學者常常受困於不會想問題:“不知道讓計算機解決這個問題該如何做”。其實,程式設計師的一個基本功是,能夠將複雜的問題分解開來。學會分解任務,因超級大分為大的、中的、小的、超小的,直到能用很直接的方法解決。記住一個很管用的策略:自項向下,逐步求精。不管做何事,都拿這個