【Netty原始碼分析】資料讀取過程
首先客戶端連線到服務端時服務端會開啟一個執行緒,不斷的監聽客戶端的操作。
這個執行緒的執行操作在NioEventLoop的run方法中,其實操作是在processSelectedKeys中,監聽是否進行讀操作
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } } }
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
判斷selectedKeys陣列中是否有值,其實就是read,write或accept事件private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } } }
主要判斷int readyOps = k.readyOps();拿到的值是否是SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
如果int readyOps = k.readyOps();拿到的值是SelectionKey.OP_READ | SelectionKey.OP_ACCEPT就執行unsafe.read();
具體實現在AbstractNioByteChannel執行read操作
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
}
讀取操作是在doReadBytes(byteBuf)中,具體實現是在NioSocketChannel中
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
接下來的操作是在AbstractByteBuf的writeBytes中進行。@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureAccessible();
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
}
return writtenBytes;
}
讀取資料操作是在setBytes函式中進行的,一般實現是在PooledUnsafeDirectByteBuf類中 @Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
try {
return in.read(tmpBuf);
} catch (ClosedChannelException ignored) {
return -1;
}
}
在這裡我們可以看到操作in.read(tmpBuf),這個操作就是NIO的讀取資料操作了,in是SocketChannel物件,將資料讀取到快取中去,完成資料的讀取過程。相關推薦
【Netty原始碼分析】資料讀取過程
首先客戶端連線到服務端時服務端會開啟一個執行緒,不斷的監聽客戶端的操作。這個執行緒的執行操作在NioEventLoop的run方法中,其實操作是在processSelectedKeys中,監聽是否進行讀操作protected void run() { for
【Netty原始碼分析】傳送資料過程
future.channel().writeAndFlush("Hello Netty Server ,I am a common client"); 呼叫AbstractChannel的writeAndFlush函式@Override public ChannelFutu
【Netty原始碼分析】Netty服務端bind埠過程
這一篇部落格我們介紹一下Netty服務端繫結埠的過程,我們通過跟蹤程式碼一直到NIO原生繫結埠的操作。繫結埠操作ChannelFuture future = serverBootstrap.bind(8080).sync();AbstractBootstrap中bind操作p
【NCNN原始碼分析】1.基本資料型別
對於NCNN而言,核心在於網路的前向推理過程(Inference),其主要資料型別為mat,該資料型別以類的形式定義在src/mat.h中,其中包含了mat的建構函式、解構函式、常見的運算過程。 #if
【NCNN原始碼分析】3.基本資料結構分析
對於NCNN,在網路層傳遞的過程中,進行資料流動的方式是通過自定義的blob實現的,對於blob通過生產者編號和消費者編號進行定義,producer表示輸出該blob的網路層編號,consumers表示
【Pig原始碼分析】談談Pig的資料模型
1. 資料模型 Schema Pig Latin表示式操作的是relation,FILTER、FOREACH、GROUP、SPLIT等關係操作符所操作的relation就是bag,bag為tuple的集合,tuple為有序的field列表集合,而field表示資料塊(A field is a piece of
【go原始碼分析】go原始碼之slice原始碼分析
Go 語言切片是對陣列的抽象。 Go 陣列的長度不可改變,與陣列相比切片的長度是不固定的,可以追加元素,在追加時可能使切片的容量增大。 len() 和 cap() 函式 切片是可索引的,並且可以由 len() 方法獲取長度。
【go原始碼分析】go原始碼之list原始碼分析
本文針對go 1.11版本,路徑src/container/list/list.go 資料結構 Element結構體 Value 前驅 後繼 // Element is an element of a linked list. type Element st
【轉】【java原始碼分析】Map中的hash演算法分析
全網把Map中的hash()分析的最透徹的文章,別無二家。 2018年05月09日 09:08:08 閱讀數:957 你知道HashMap中hash方法的具體實現嗎?你知道HashTable、ConcurrentHashMap中hash方法
【ArcGIS|空間分析】資料互動(Google Earth與ArcGIS,ENVI與ArcGIS)
文章目錄 Google Earth資料與ArcGIS資料互動 ENVI資料與ArcGIS資料互動 ENVI資料匯入ArcGIS ArcGIS資料匯入ENVI Google Earth資料與ArcGIS資料互動
【spring原始碼分析】IOC容器初始化(二)
前言:在【spring原始碼分析】IOC容器初始化(一)中已經分析了匯入bean階段,本篇接著分析bean解析階段。 1.解析bean程式呼叫鏈 同樣,先給出解析bean的程式呼叫鏈: 根據程式呼叫鏈,整理出在解析bean過程中主要涉及的類和相關方法。 2.解析bean原始碼分
【Go 原始碼分析】從 sort.go 看排序演算法的工程實踐
go version go1.11 darwin/amd64file: src/sort/sort.go 排序演算法有很多種類,比如快排、堆排、插入排序等。各種排序演算法各有其優劣性,在實際生產過程中用到的排序演算法(或者說 Sort 函式)通常是由幾種排序演算法組
【PHP7原始碼分析】如何理解PHP虛擬機器(一)
順風車運營研發團隊 李樂 1.從物理機說起 虛擬機器也是計算機,設計思想和物理機有很多相似之處; 1.1馮諾依曼體系結構 馮·諾依曼是當之無愧的數字計算機之父,當前計算機都採用的是馮諾依曼體系結構;設計思想主要包含以下幾個方面: 指令和資料不加區別混合儲存在同一個儲
【Mybatis原始碼分析】13-記一次PageHelper reasonable引數使用不當造成的死迴圈
問題描述及原因 使用Mybatis PageHelper外掛做了表的分頁查詢,要求查詢符合某一條件的所有記錄做處理,就寫了一個迭代器在while迴圈裡對每條記錄做處理,直到符合條件的記錄都處理完程式返回。程式碼如下 public class ReconPaymentI
【Tomcat9原始碼分析】元件與框架概述
1 元件與框架介紹 Server:代表整個Catalina Servlet容器,可以包含一個或多個Service Service:包含Connector和Container的集合,Service用適當的Connector接收使用者的請求,
【E2LSH原始碼分析】p穩定分佈LSH演算法初探
上一節,我們分析了LSH演算法的通用框架,主要是建立索引結構和查詢近似最近鄰。這一小節,我們從p穩定分佈LSH(p-Stable LSH)入手,逐漸深入學習LSH的精髓,進而靈活應用到解決大規模資料的
【MyBatis原始碼分析】TypeHandler解析屬性配置元素詳述及相關列舉使用高階進階
TypeHandler解析接著看一下typeHandlerElement(root.evalNode("typeHandlers"));方法,這句讀取的是<configuration>下的<typeHandlers>節點,程式碼實現為:private
【Java8原始碼分析】併發包-AtomicInteger
AtomicInteger類是實現了原子操作的Integer,以往對於保證int、double、float等基礎型別的運算原子性,需要採用加鎖的方式。但是為了一個簡單的運算操作採用鎖,在多執行緒競爭嚴重的情況下,會導致效能降低,所以在java1.5推出了Atom
【proxyConfig原始碼分析】【proxyTargetClass】【exposeProxy】
本文轉載自shysheng:spring proxyConfig原始碼分析 我們知道,ProxyConfig是所有產生Spring AOP代理物件的基類,它是一個配置源,主要為其AOP代理物件工廠實現類提供基本的配置屬性。 它一共包含5個屬性,本文主要就是分析這5個屬性在產生代理物件過
【OkHttp3原始碼分析】(一)Request的execute
簡單使用OkHttp3 閱讀本文需要對OkHttp3的使用有一定了解。 首先我們先看看如何簡單進行一個get請求的Request。 Request qqRequest = new Request.Builder()