淺談NIO和Epoll的實現原理
什麼是NIO
NIO又叫New/Non-blocking IO,這個概念基本人人都聽過,但是不一定每個人都懂他它的執行的原理。
這裡我們來探討這個問題,先用一個例子解釋一下BIO到底阻塞了哪裡。
/** * 這是一個單執行緒BIOServer * @author endless * @create 2020-03-23 */ public class BioServerDemo { public static void main(String[] args) throws IOException { // 建立ServerSocket,並繫結埠 ServerSocket serverSocket = new ServerSocket(9999); System.out.println("服務啟動成功"); while (true) { Socket socket = serverSocket.accept(); System.out.println("連線成功"); System.out.println("準備接收資料"); byte[] bytes = new byte[1024]; socket.getInputStream().read(bytes); System.out.println("接收到了資料:" + new String(bytes));} } } /** * BIO client * * @author endless * @create 2020-03-23 */ public class BioClientDemo { public static void main(String[] args) throws IOException { // 連線Server Socket socket = new Socket("127.0.0.1", 9999); System.out.println("連線成功"); Scanner scanner = new Scanner(System.in); // 迴圈等待輸入訊息 while (true) { String str = scanner.next(); // 約定退出口令 if ("exit".equalsIgnoreCase(str)) { socket.close(); System.exit(0); } socket.getOutputStream().write(str.getBytes()); socket.getOutputStream().flush(); } } }
先執行Server
命令列列印服務啟動成功,此時並無客戶端連線,所以連線成功並未列印,說明程式被阻塞在了serverSocket.accept()方法
此時執行Client,Server列印日誌連線成功和準備接收資料,此時Client尚未傳送資料,Server被阻塞在
socket.getInputStream().read(bytes)上,因此其他客戶端無法進行連線。
在Client輸入Hello回車,此時Server列印接收到了資料:Hello,說明客戶端的連線傳送過來資料了,此時服務端執行緒才解阻塞,在這
個情況下,這個Server沒有辦法處理併發,同時期只能處理一個連線。
那麼BIO是如何實現併發呢?答案也很明顯,就是使用多執行緒,我們對Server進行一些小改動。
/** * 這是一個BIOServer * @author endless * @create 2020-03-23 */ public class BioServerDemo { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(9999); System.out.println("服務啟動成功"); while (true) { Socket socket = serverSocket.accept(); new Thread(()->{ System.out.println("連線成功"); System.out.println("準備接收資料"); byte[] bytes = new byte[1024]; try { socket.getInputStream().read(bytes); } catch (IOException e) { e.printStackTrace(); } System.out.println("接收到了資料:" + new String(bytes)); }).start(); } } }
使用子執行緒來對接收到的Socket進行處理,這樣每個連線都被阻塞在單獨的執行緒上,就可以實現併發訪問Server。
總結:BIO的阻塞有兩個地方:accept()和read(),並且BIO的併發只能通過多執行緒。
但是這裡會有一個問題,就是如果絕大部分的連線都沒有進行資料傳輸,只是建立了連線,這樣就會產生很多無效的執行緒,而執行緒又
是非常寶貴的稀缺資源,這樣就會白白損失很多的效能,這也是BIO最大的效能瓶頸。
那能不能只用一個執行緒就能實現併發並且處理全部的連線呢?是否能設計一個Api,讓accept和read不再阻塞,使用一個執行緒就能處
理併發連線呢?答案是肯定的,這裡就要用到我們的今天的主角NIO了。
NIO在JDK中被封裝在了一個新的類中,我們先來寫一個例子,這個例子實現了使用單執行緒來處理多連線。
/** * NIO Server Demo * * @author endless * @create 2020-03-23 */ public class NioServerDemo { // 儲存客戶端連線 static List<SocketChannel> channelList = new ArrayList<>(); public static void main(String[] args) throws IOException, InterruptedException { // 建立NIO ServerSocketChannel ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(9998)); // 設定ServerSocketChannel為非阻塞 serverSocket.configureBlocking(false); System.out.println("服務啟動成功"); while (true) { SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { // 如果有客戶端進行連線 System.out.println("連線成功"); // 設定SocketChannel為非阻塞 socketChannel.configureBlocking(false); // 儲存客戶端連線在List中 channelList.add(socketChannel); } // 遍歷連線進行資料讀取 Iterator<SocketChannel> iterator = channelList.iterator(); while (iterator.hasNext()) { SocketChannel o = iterator.next(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); int read = o.read(byteBuffer); // 如果有資料,把資料打印出來 if (read > 0) { System.out.println("接收到訊息:" + new String(byteBuffer.array())); } else if (read == -1) { // 如果客戶端斷開,把socket從集合中去掉 iterator.remove(); System.out.println("客戶端斷開連線"); } } } } }
客戶端可以複用之前的BIO客戶端
執行NIOServer,Server啟動完畢後執行兩個Client,各發送一條訊息進行測試
控制檯顯示兩個連線成功,並且接收到了來自兩個客戶端的訊息,表明Server可以使用單執行緒處理併發連線,這個Api的原理是什麼呢?我們來進一步探究一下。
我們沿著原始碼一路往下找,會找到一個無法下載原始碼的檔案
這裡可以看得出,在Windows系統中,編譯後的程式碼顯示直接返回了WindowsSelectorProvider物件,很顯然,這個物件是在和windows系統核心中的select方法互動,但是Linux中是不是也是這樣呢,我們需要下載一個Linux版本的OpenJDK原始碼來探究一下。下載
OpenJDK原始碼
下載後解壓,Linux原始碼分佈在\openjdk\jdk\src\share和\openjdk\jdk\src\solaris目錄中。
在上圖中可以看出,JDK在不同的系統中採用了不同的實現方案,這裡使用的是EPollSelectorProvider,說明在Linux中,使用的是EPoll來實現的。
我們先來看看serverSocket.configureBlocking(false);到底是如何工作的,沿著原始碼往下找,發現一個本地方法。
這個本地方法的原始碼可以在OpenJDK中找到,如下圖
上圖紅框中的函式就是這個本地方法呼叫的底層C語言的方法,前面的字首是根據JNI呼叫規則新增的,我們知道,在C當中是可以直接呼叫作業系統的Api的,這個方法呼叫了fcntl這個命令,把傳進來的blocking引數設定到了檔案描述符上(檔案描述符可以看作是一個物件,Linux中一切皆檔案,類似於高階語言中的一切皆物件,任何的資料流轉都要通過一個檔案描述符來操作)。
接著看看serverSocket.accept()是如何實現
上圖可以看到,accept方法呼叫了一個native方法accept0
這個accept0方法的描述簡單翻譯一下,就是接受一個新的連線,把給定的檔案描述符引用設定為新的Socket,並將isaa[0]設定為套接字的遠端地址,成功返回1,不成功返回IOStatus.UNAVAILABLE or IOStatus.INTERRUPTED。
繼續探究一下本地方法的原始碼
這裡呼叫了作業系統的accept方法,想知道這個方法文件的同學可以在Linux環境中使用man命令來檢視
man命令可以檢視Linux詳盡的文件,2表示第二章:系統呼叫指令,最後加上你想查的指令方法即可
這裡主要看一下返回值,accept返回一個非負數作為socket的描述符,如果失敗返回-1。通過這個檔案描述符,Java就可以通過native方法呼叫作業系統的API來在Socket中進行資料的傳輸。我們的程式碼用一個List儲存了接收到的Socket,相當於儲存了Socket的檔案描述符,通過一個執行緒輪詢這些檔案描述符即可實現資料通訊和處理新連線,這樣就節約了大量的執行緒資源,但是大家想一想這種模型還有什麼缺陷。
……
是的,如果連線數太多的話,還是會有大量的無效遍歷,例如10000個連線中只有1000個連線時有資料的,但是由於其他9000個連線並沒有斷開,我們還是要每次輪詢都遍歷一萬次,相當於有十分之九的遍歷都是無效的,這顯然不是一個讓人很滿意的狀態。
總結:NIO的非阻塞是由作業系統來完成的,SocketChannel與檔案描述符一一對應,通過遍歷檔案描述符來讀取資料。
什麼是多路複用器
上面的例子還不是Java NIO的完全體,僅僅是將原來的同步阻塞IO優化成了同步非阻塞IO,既然還是同步的,就意味著我們每次遍
歷,還是需要對每個Socket進行一次read操作來檢查是不是有資料過來,都會呼叫系統核心的read指令,只不過是把阻塞變成了非阻
塞,如果無用連線很多的話,那麼絕大部分的read指令都是無意義的,這就會佔用很多的CPU時間。
Linux有select、poll和epoll三個解決方案來實現多路複用,其中的select和poll,有點類似於上面的NIOServerDemo程式,他會把
所有的檔案描述符記錄在一個數組中,通過在系統核心中遍歷來篩選出有資料過來的Socket,只不過是從應用程式中遍歷改成了在核心中
遍歷,本質還是一樣的。
Epoll則使用了事件機制,在複用器中註冊了一個回撥事件,當Socket中有資料過來的時候呼叫,通知使用者處理資訊,這樣就不需要對
全部的檔案描述符進行輪訓了,這就是Epoll對NIO進行的改進。
我們來探究一下在Java中是如何用Epoll實現NIO事件機制的,先對上面的NIOServerDemo再進行改進。
/** * NIO Selector Server Demo * * @author endless * @create 2020-03-23 */ public class NioSelectorServerDemo { public static void main(String[] args) throws IOException, InterruptedException { // 建立NIO ServerSocketChannel ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(9998)); // 設定ServerSocketChannel為非阻塞 serverSocket.configureBlocking(false); // 開啟Selector處理Channel,即建立epoll Selector selector = Selector.open(); // 將ServerSocket註冊到selector用來接收連線 serverSocket.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服務啟動成功"); while (true) { // 阻塞等待需要處理的事件發生 selector.select(); // 獲取selector中註冊的全部事件的 SelectionKey 例項 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); // 遍歷SelectionKey對事件進行處理 while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 如果是OP_ACCEPT事件,則進行連接獲取和事件註冊 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = server.accept(); socketChannel.configureBlocking(false); // 這裡只註冊了讀事件,如果需要給客戶端傳送資料可以註冊寫事件 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("客戶端連線成功"); } // 如果是OP_READ事件,則進行讀取和列印 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); int read = socketChannel.read(byteBuffer); // 如果有資料,把資料打印出來 if (read > 0) { System.out.println("接收到訊息:" + new String(byteBuffer.array())); } else if (read == -1) { // 如果客戶端斷開連線,關閉Socket System.out.println("客戶端斷開連線"); socketChannel.close(); } } } } } }
這段程式碼需要關注的點有以下幾個方法:
- Selector.open()
- socketChannel.register()
- selector.select()
接下來就來看看這三個方法究竟做了什麼。
Select.open()
首先呼叫了SelectorProvider的openSelector()方法,這個方法返回一個EPollSelectorImpl例項
EPollSelectorProvider.java
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
EPollSelectorImpl的構造方法中new了一個EPollArrayWrapper例項
EPollSelectorImpl.java
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// 建立管道,用long型別返回管道的兩個檔案描述符。讀端以高32位的形式返回,寫端以低32位的形式返回。
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 建立Epoll檔案描述符,並且建立對映陣列記錄事件
pollWrapper = new EPollArrayWrapper();
// 初始化中斷檔案描述符,把新建立的Epoll註冊到管道的讀檔案描述符上
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
EPollArrayWrapper的構造方法中建立了Epoll例項
EPollArrayWrapper.java
EPollArrayWrapper() throws IOException {
// 建立了Epoll例項,並將它的事件陣列地址記錄下來方便操作
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
...
private native int epollCreate();
Native方法epollCreate()的原始碼,呼叫了核心的epoll_create指令,建立並獲取了一個檔案描述符。
epoll有三個指令,epoll_create、epoll_ctl、epoll_wait,都可以在Linux環境中使用man命令來檢視詳細文件。
EPollArrayWrapper.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
socketChannel.register()
註冊事件時其實並沒有對Epoll進行事件新增,而是在只是把它加入了待新增的容器。
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
// 判斷是否存在事件的key,存在的話就更新,不存在就新建
// 最終都會走到 EPollArrayWrapper#setUpdateEvents方法
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
AbstractSelector.register呼叫實現的子類方法implRegister
EPollSelectorImpl.java
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
呼叫setUpdateEvents寫入待更新事件容器
EPollArrayWrapper.java
/**
* Add a file descriptor
*/
void add(int fd) {
// 強制初始update events為0,因為他可能會被之前註冊kill掉
synchronized (updateLock) {
assert !registered.get(fd);
setUpdateEvents(fd, (byte)0, true);
}
}
/**
* 設定檔案描述符的待更新事件到 eventsHigh 中。
*/
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
selector.select()
呼叫selector.select()時,會將剛才註冊的待更新事件繫結到檔案描述符上,然後進入阻塞狀態等待事件回撥。
EPollSelectorImpl.java
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
EPollArrayWrapper.java
int poll(long timeout) throws IOException {
// 將上一步待更新的時間進行註冊
updateRegistrations();
// 進入阻塞狀態,等待事件發生
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 呼叫native方法進行事件繫結
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
總結:到此為止,我們已經粗略的把整個NIO呼叫流程都梳理了一遍,Java呼叫了作業系統的Api來建立Socket,獲取到Socket的檔案描述符,再建立一個Selector物件,對應作業系統的EPoll描述符,將獲取到的Socket連線的檔案描述符的事件繫結到Selector對應的EPoll檔案描述符上,進行事件的非同步通知,這樣就實現了使用一條執行緒,並且不需要太多的無效的遍歷,將事件處理交給了作業系統核心,大大提高了效率。
EPoll指令詳解
epoll_create
int epoll_create(int size);
建立一個epoll例項,並返回一個非負數作為檔案描述符,用於對epoll介面的所有後續呼叫。引數size代表可能會容納size個描述符,但size不是一個最大值,只是提示作業系統它的數量級,現在這個引數基本上已經棄用了。
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
使用檔案描述符epfd引用的epoll例項,對目標檔案描述符fd執行op操作。
引數epfd表示epoll對應的檔案描述符,引數fd表示socket對應的檔案描述符。
引數op有以下幾個值:EPOLL_CTL_ADD:註冊新的fd到epfd中,並關聯時間eventEPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;EPOLL_CTL_DEL:從epfd中移除fd,並且忽略掉繫結的event,這時event可以為null;
引數event是一個結構體。
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
events有很多可選值,這裡只舉例最常見的幾個:
EPOLLIN :表示對應的檔案描述符是可讀的(close也會發送訊息);EPOLLOUT:表示對應的檔案描述符是可寫的;EPOLLERR:表示對應的檔案描述符發生了錯誤;
成功則返回0,失敗返回-1
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待檔案描述符epfd上的事件。
epfd是Epoll對應的檔案描述符,events表示呼叫者所有可用事件的集合,maxevents表示最多等到多少個事件就返回,timeout是超時時間。
完
作業系統的IO還涉及到零拷貝和直接記憶體兩部分的知識,也是作業系統提高效能的利器,將在以後的文章中進行探討。