通過例項理解Java網路IO模型
網路IO模型及分類
網路IO模型是一個經常被提到的問題,不同的書或者部落格說法可能都不一樣,所以沒必要死摳字眼,關鍵在於理解。
Socket連線
不管是什麼模型,所使用的socket連線都是一樣的。
以下是一個典型的應用伺服器上的連線情況。客戶的各種裝置通過Http協議與Tomcat程序互動,Tomcat需要訪問Redis伺服器,它與Redis伺服器也建了好幾個連線。雖然客戶端與Tomcat建的是短連線,很快就會斷開,Tomcat與Redis是長連線,但是它們本質上都是一樣的。
建立一個Socket後,就是"本地IP+port與遠端IP+port"的一個配對,這個Socket由應用程序呼叫作業系統的系統呼叫建立,在核心空間會有一個與之對應的結構體,而應用程式拿到的是一個檔案描述符(File Describer),就跟開啟一個普通的檔案一樣,可以讀寫。不同的程序有自己的檔案描述符空間,比如程序1中有個socket的fd為100,程序2中也有一個socket的fd為100,它們對應的socket是不一樣的(當然也有可能一樣,因為socket也可以共享)。
Socket是全雙工的,可以同時讀和寫。
對於不同的應用場景,選用的網路IO模型以及其它方面的選項都不一樣。
例如針對客戶端的http 請求,我們一般使用短連線,因為客戶太多,同時使用App的客戶可能很多,但是同一時刻傳送請求的客戶端遠少於正在使用的客戶數,如果都建立長連線,記憶體肯定不夠用,所以會用短連線,當然會有http的keep-alive策略,讓一次tcp連線多互動幾次http資料,這樣能減少建鏈。而對於系統內部的應用,例如Tomcat訪問Redis,訪問的機器數有限,如果每次都用短連線,會有太多的損耗用在建鏈上,所以用長連線,可以大大提高效率。
以上說的是長連線和短連線,一般在討論IO模型時不考慮這個,而是考慮的同步非同步,阻塞非阻塞等。而要確定哪種IO模型,也得看場景,對於CPU密集型的應用,例如一次請求需要兩個核不停的100%跑1分鐘,然後返回結果,這種應用使用哪種IO模型都差不多,因為瓶頸在CPU。所以一般是IO密集型的的應用才考慮如何調整IO模型以獲取最大的效率,最典型的就是Web應用,還有像Redis這種應用。
同步非同步、阻塞非阻塞的概念
同步與非同步:描述的是使用者執行緒與核心的互動方式,同步指使用者執行緒發起IO請求後需要等待或者輪詢核心IO操作完成後才能繼續執行;而非同步是指使用者執行緒發起IO請求後仍然繼續執行,當核心IO操作完成後會通知使用者執行緒,或者呼叫使用者執行緒註冊的回撥函式。
阻塞與非阻塞:描述是使用者執行緒呼叫核心IO操作的方式,阻塞是指IO操作需要徹底完成後才返回到使用者空間;而非阻塞是指IO操作被呼叫後立即返回給使用者一個狀態值,無需等到IO操作徹底完成。
以read函式呼叫來說明不同的IO模式。從對端讀取資料分為兩個階段
(1)資料從裝置到核心空間(圖中等待資料到達)
(2)資料從核心空間到使用者空間(圖中資料拷貝)
以下阻塞IO,非阻塞IO,IO多路複用,都是同步IO,最後是非同步IO。這個地方可能不好理解,總之同步IO必須是執行緒呼叫了讀寫函式後,一直阻塞等,或者輪詢查結果,而非同步IO,調完讀寫函式後立刻返回,操作完成後作業系統主動告訴執行緒。
阻塞IO
阻塞IO是指呼叫了read後,必須等待資料到達,並且複製到了使用者空間,才能返回,否則整個執行緒一直在等待。
所以阻塞IO的問題就是,執行緒在讀寫IO的時候不能幹其它的事情。
非阻塞IO
非阻塞IO在呼叫read後,可以立刻返回,然後問作業系統,資料有沒有在核心空間準備好,如果準備好了,就可以read出來了。因為不知道什麼時候準備好,要保證實時性,就得不斷的輪詢。
IO多路複用(非阻塞IO)
在使用非阻塞IO的時候,如果每個執行緒訪問網路後都不停的輪詢,那麼這個執行緒就被佔用了,那跟阻塞IO也沒什麼區別了。每個執行緒都輪詢自己的socket,這些執行緒也不能幹其它的事情。
如果能有一個專門的執行緒去輪詢所有的socket,如果資料準備好,就找一個執行緒處理,這就是IO多路複用。當然輪詢的執行緒也可以不用找其他執行緒處理,自己處理就行,例如redis就是這樣的。
IO多路複用,能夠讓一個或幾個執行緒去管理很多個(可以成千上萬)socket連線,這樣連線數就不再受限於系統能啟動的執行緒數。
我們把select輪詢抽出來放在一個執行緒裡, 使用者執行緒向其註冊相關socket或IO請求,等到資料到達時通知使用者執行緒,則可以提高使用者執行緒的CPU利用率.這樣, 便實現了非同步方式。
這其中用了Reactor設計模式。
非同步IO
真正的非同步IO需要作業系統更強的支援。 IO多路複用模型中,資料到達核心後通知使用者執行緒,使用者執行緒負責從核心空間拷貝資料; 而在非同步IO模型中,當用戶執行緒收到通知時,資料已經被作業系統從核心拷貝到使用者指定的緩衝區內,使用者執行緒直接使用即可。
非同步IO用了Proactor設計模式。
常見的Web系統裡很少使用非同步IO,本文不做過多的探討。
接下來通過一個簡單的java版redis說明各種IO模型。
實戰
接下來我會編寫一個簡單的java版的Redis,它只有get和set功能,並且只支援字串,只是為了演示各種IO模型,其中一些異常處理之類的做的不到位。
1.阻塞IO+單執行緒+短連線
這種做法只用於寫HelloWorld程式,在這裡主要為了除錯以及把一些公共的類提出來。
首先寫一個Redis介面
package org.ifool.niodemo.redis;
public interface RedisClient {
public String get(String key);
public void set(String key,String value);
public void close();
}
另外,有個工具類,用於拿到請求資料後,處理請求,並返回結果,還有一些byte轉String,String轉byte,在byte前面新增長度等一些函式,供後續使用。
輸入是get|key或者set|key|value,輸出為0|value或者1|null或者2|bad command。
package org.ifool.niodemo.redis;
import java.util.Map;
public class Util {
//把一個String前邊加上一個byte,表示長度
public static byte[] addLength(String str) {
byte len = (byte)str.length();
byte[] ret = new byte[len+1];
ret[0] = len;
for(int i = 0; i < len; i++) {
ret[i+1] = (byte)str.charAt(i);
}
return ret;
}
//根據input返回一個output,操作快取, prefixLength為true,則在前面加長度
//input:
//->get|key
//->set|key|value
//output:
//->errorcode|response
// ->0|response set成功或者get有值
// ->1|response get的為null
// ->2|bad command
public static byte[] processRequest(Map<String,String> cache, byte[] request, int length, boolean prefixLength) {
if(request == null) {
return prefixLength ? addLength("2|bad command") : "2|bad command".getBytes();
}
String req = new String(request,0,length);
Util.log_debug("command:"+req);
String[] params = req.split("\\|");
if( params.length < 2 || params.length > 3 || !(params[0].equals("get") || params[0].equals("set"))) {
return prefixLength ? addLength("2|bad command") : "2|bad command".getBytes();
}
if(params[0].equals("get")) {
String value = cache.get(params[1]);
if(value == null) {
return prefixLength ? addLength("1|null") : "1|null".getBytes();
} else {
return prefixLength ? addLength("0|"+value) : ("0|"+value).getBytes();
}
}
if(params[0].equals("set") && params.length >= 3) {
cache.put(params[1],params[2]);
return prefixLength ? addLength("0|success"): ("0|success").getBytes();
} else {
return prefixLength ? addLength("2|bad command") : "2|bad command".getBytes();
}
}
public static int LOG_LEVEL = 0; //0 info 1 debug
public static void log_debug(String str) {
if(LOG_LEVEL >= 1) {
System.out.println(str);
}
}
public static void log_info(String str) {
if(LOG_LEVEL >= 0) {
System.out.println(str);
}
}
}
服務端程式碼如下,在建立服務端ServerSocket的時候,傳入埠8888, backlog的作用是客戶端建立連線時服務端沒法立即處理,能夠等待的佇列長度。服務端程式碼
package org.ifool.niodemo.redis.redis1;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RedisServer1 {
//全域性快取
public static Map<String,String> cache = new ConcurrentHashMap<String,String>();
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888,10);
byte[] buffer = new byte[512];
while(true) {
//接受客戶端連線請求
Socket clientSocket = null;
clientSocket = serverSocket.accept();
System.out.println("client address:" + clientSocket.getRemoteSocketAddress().toString());
//讀取資料並且操作快取,然後寫回資料
try {
//讀資料
InputStream in = clientSocket.getInputStream();
int bytesRead = in.read(buffer,0,512);
int totalBytesRead = 0;
while(bytesRead != -1) {
totalBytesRead += bytesRead;
bytesRead = in.read(buffer,totalBytesRead,512-totalBytesRead);
}
//操作快取
byte[] response = Util.processRequest(cache,buffer,totalBytesRead,false);
Util.log_debug("response:"+new String(response));
//寫回資料
OutputStream os = clientSocket.getOutputStream();
os.write(response);
os.flush();
clientSocket.shutdownOutput();
} catch (IOException e) {
System.out.println("read or write data exception");
} finally {
try {
clientSocket.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
}
客戶端程式碼如下:
package org.ifool.niodemo.redis.redis1;
import org.ifool.niodemo.redis.RedisClient;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
public class RedisClient1 implements RedisClient {
public static void main(String[] args) {
RedisClient redis = new RedisClient1("127.0.0.1",8888);
redis.set("123","456");
String value = redis.get("123");
System.out.print(value);
}
private String ip;
private int port;
public RedisClient1(String ip, int port) {
this.ip = ip;
this.port = port;
}
public String get(String key) {
Socket socket = null;
try {
socket = new Socket(ip, port);
} catch(IOException e) {
throw new RuntimeException("connect to " + ip + ":" + port + " failed");
}
try {
//寫資料
OutputStream os = socket.getOutputStream();
os.write(("get|"+key).getBytes());
socket.shutdownOutput(); //不shutdown的話對端會等待read
//讀資料
InputStream in = socket.getInputStream();
byte[] buffer = new byte[512];
int offset = 0;
int bytesRead = in.read(buffer);
while(bytesRead != -1) {
offset += bytesRead;
bytesRead = in.read(buffer, offset, 512-offset);
}
String[] response = (new String(buffer,0,offset)).split("\\|");
if(response[0].equals("2")) {
throw new RuntimeException("bad command");
} else if(response[0].equals("1")) {
return null;
} else {
return response[1];
}
} catch(IOException e) {
throw new RuntimeException("network error");
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void set(String key, String value) {
Socket socket = null;
try {
socket = new Socket(ip, port);
} catch(IOException e) {
throw new RuntimeException("connect to " + ip + ":" + port + " failed");
}
try {
OutputStream os = socket.getOutputStream();
os.write(("set|"+key+"|"+value).getBytes());
os.flush();
socket.shutdownOutput();
InputStream in = socket.getInputStream();
byte[] buffer = new byte[512];
int offset = 0;
int bytesRead = in.read(buffer);
while(bytesRead != -1) {
offset += bytesRead;
bytesRead = in.read(buffer, offset, 512-offset);
}
String bufString = new String(buffer,0,offset);
String[] response = bufString.split("\\|");
if(response[0].equals("2")) {
throw new RuntimeException("bad command");
}
} catch(IOException e) {
throw new RuntimeException("network error");
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void close() {
}
}
2.阻塞IO+多執行緒+短連線
一般應用伺服器用的都是這種模型,主執行緒一直阻塞accept,來了一個連線就交給一個執行緒,繼續等待連線,然後這個處理執行緒讀寫完後負責關閉連線。
服務端程式碼
package org.ifool.niodemo.redis.redis2;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;
public class RedisServer2 {
//全域性快取
public static Map<String,String> cache = new ConcurrentHashMap<String,String>();
public static void main(String[] args) throws IOException {
//用於處理請求的執行緒池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(200, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
ServerSocket serverSocket = new ServerSocket(8888,1000);
while(true) {
//接受客戶端連線請求
Socket clientSocket = serverSocket.accept();
Util.log_debug(clientSocket.getRemoteSocketAddress().toString());
//讓執行緒池處理這個請求
threadPool.execute(new RequestHandler(clientSocket));
}
}
}
class RequestHandler implements Runnable{
private Socket clientSocket;
public RequestHandler(Socket socket) {
clientSocket = socket;
}
public void run() {
byte[] buffer = new byte[512];
//讀取資料並且操作快取,然後寫回資料
try {
//讀資料
InputStream in = clientSocket.getInputStream();
int bytesRead = in.read(buffer,0,512);
int totalBytesRead = 0;
while(bytesRead != -1) {
totalBytesRead += bytesRead;
bytesRead = in.read(buffer,totalBytesRead,512-totalBytesRead);
}
//操作快取
byte[] response = Util.processRequest(RedisServer2.cache,buffer,totalBytesRead,false);
Util.log_debug("response:"+new String(response));
//寫回資料
OutputStream os = clientSocket.getOutputStream();
os.write(response);
os.flush();
clientSocket.shutdownOutput();
} catch (IOException e) {
System.out.println("read or write data exception");
} finally {
try {
clientSocket.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
客戶端程式碼,程式碼跟前邊的沒啥變化,只是這次我加了一個多執行緒的讀寫,10個執行緒每個執行緒讀寫10000次。
public static void main(String[] args) {
final RedisClient redis = new RedisClient1("127.0.0.1",8888);
redis.set("123","456");
String value = redis.get("123");
System.out.print(value);
redis.close();
System.out.println(new Timestamp(System.currentTimeMillis()));
testMultiThread();
System.out.println(new Timestamp(System.currentTimeMillis()));
}
public static void testMultiThread() {
Thread[] threads = new Thread[10];
for(int i = 0; i < 10; i++) {
threads[i] = new Thread(new Runnable() {
public void run() {
RedisClient redis = new RedisClient2("127.0.0.1",8888);
for(int j=0; j < 300; j++) {
Random rand = new Random();
String key = String.valueOf(rand.nextInt(1000));
String value = String.valueOf(rand.nextInt(1000));
redis.set(key,value);
String value1 = redis.get(key);
}
}
});
threads[i].start();
}
for(int i = 0; i < 10; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
用這種方式,在10個併發不停讀寫的情況下,寫10000次,出現了一些沒法連線的異常,如下:
java.net.NoRouteToHostException: Can't assign requested address
查了下跟系統引數配置,mac上不知道怎麼調就沒調,改成讀寫300次的時候沒報錯,大約用1s鍾。
3.阻塞IO+多執行緒+長連線
用短連線的時候,我們可以用inputstream.read() == -1來判斷讀取結束,但是用長連線時,資料是源源不斷的,有可能有粘包或者半包問題,我們需要能從流中找到一次請求的開始和結束。有多種方式,例如使用固定長度、固定分隔符、在前面加長度等方法。此處使用前邊加長度的方法,在前面放一個byte,表示一次請求的長度,byte最大是127,所以請求長度不應大於127個位元組。
由於我們客戶端訪問的方式是寫完請求後,等待服務端返回資料,等待期間該socket不會被其它人寫,所以不存在粘包的問題,只存在半包的問題。有些請求方式可能是寫完後在未等待服務端返回就允許其它執行緒寫,那樣就可能有半包。
一般客戶端用長連線的時候,都是建一個連線池,用的時候上鎖獲取連線,我們在這個地方直接讓一個執行緒持有一個連線一個讀寫,這樣減少了執行緒切換與上鎖的開銷,能實現更大的吞吐量。
客戶端程式碼這次發生了較大變化。
package org.ifool.niodemo.redis.redis3;
import org.ifool.niodemo.redis.RedisClient;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.sql.Timestamp;
import java.util.Random;
public class RedisClient3 implements RedisClient {
public static void main(String[] args) {
RedisClient redis = new RedisClient3("127.0.0.1",8888);
redis.set("123","456");
String value = redis.get("123");
System.out.print(value);
redis.close();
System.out.println(new Timestamp(System.currentTimeMillis()));
testMultiThread();
System.out.println(new Timestamp(System.currentTimeMillis()));
}
public static void testMultiThread() {
Thread[] threads = new Thread[10];
for(int i = 0; i < 10; i++) {
threads[i] = new Thread(new Runnable() {
public void run() {
RedisClient redis = new RedisClient3("127.0.0.1",8888);
for(int j=0; j < 50; j++) {
Random rand = new Random();
String key = String.valueOf(rand.nextInt(1000));
String value = String.valueOf(rand.nextInt(1000));
redis.set(key,value);
String value1 = redis.get(key);
}
}
});
threads[i].start();
}
for(int i = 0; i < 10; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private String ip;
private int port;
private Socket socket;
public RedisClient3(String ip, int port) {
this.ip = ip;
this.port = port;
try {
socket = new Socket(ip, port);
} catch(IOException e) {
throw new RuntimeException("connect to " + ip + ":" + port + " failed");
}
}
public String get(String key) {
try {
//寫資料,前邊用一個byte儲存長度
OutputStream os = socket.getOutputStream();
String cmd = "get|"+key;
byte length = (byte)cmd.length();
byte[] data = new byte[cmd.length()+1];
data[0] = length;
for(int i = 0; i < cmd.length(); i++) {
data[i+1] = (byte)cmd.charAt(i);
}
os.write(data);
os.flush();
//讀資料,第一個位元組是長度
InputStream in = socket.getInputStream();
int len = in.read();
if(len == -1) {
throw new RuntimeException("network error");
}
byte[] buffer = new byte[len];
int offset = 0;
int bytesRead = in.read(buffer,0,len);
while(offset < len) {
offset += bytesRead;
bytesRead = in.read(buffer, offset, len-offset);
}
String[] response = (new String(buffer,0,offset)).split("\\|");
if(response[0].equals("2")) {
throw new RuntimeException("bad command");
} else if(response[0].equals("1")) {
return null;
} else {
return response[1];
}
} catch(IOException e) {
throw new RuntimeException("network error");
} finally {
}
}
public void set(String key, String value) {
try {
//寫資料,前邊用一個byte儲存長度
OutputStream os = socket.getOutputStream();
String cmd = "set|"+key + "|" + value;
byte length = (byte)cmd.length();
byte[] data = new byte[cmd.length()+1];
data[0] = length;
for(int i = 0; i < cmd.length(); i++) {
data[i+1] = (byte)cmd.charAt(i);
}
os.write(data);
os.flush();
InputStream in = socket.getInputStream();
int len = in.read();
if(len == -1) {
throw new RuntimeException("network error");
}
byte[] buffer = new byte[len];
int offset = 0;
int bytesRead = in.read(buffer,0,len);
while(offset < len) {
offset += bytesRead;
bytesRead = in.read(buffer, offset, len-offset);
}
String bufString = new String(buffer,0,offset);
Util.log_debug(bufString);
String[] response = bufString.split("\\|");
if(response[0].equals("2")) {
throw new RuntimeException("bad command");
}
} catch(IOException e) {
throw new RuntimeException("network error");
} finally {
}
}
public void close() {
try {
socket.close();
} catch(IOException ex) {
ex.printStackTrace();
}
}
}
服務端建立一個連線,就由一個執行緒一直處理這個連線,有資料就處理,沒資料就不處理。這樣的話,每個連線一個執行緒,如果連線數較大,就會有問題。
package org.ifool.niodemo.redis.redis3;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class RedisServer3 {
//全域性快取
public static Map<String,String> cache = new ConcurrentHashMap<String,String>();
public static void main(String[] args) throws IOException {
//用於處理請求的執行緒池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5));
ServerSocket serverSocket = new ServerSocket(8888, 10);
byte[] buffer = new byte[512];
while (true) {
//接受客戶端連線請求
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
Util.log_debug(clientSocket.getRemoteSocketAddress().toString());
} catch (IOException e) {
e.printStackTrace();
}
//讓執行緒池處理這個請求
threadPool.execute(new RequestHandler(clientSocket));
}
}
}
class RequestHandler implements Runnable{
private Socket clientSocket;
public RequestHandler(Socket socket) {
clientSocket = socket;
}
public void run() {
byte[] buffer = new byte[512];
//讀取資料並且操作快取,然後寫回資料
try {
while(true) {
//讀資料
InputStream in = clientSocket.getInputStream();
int len = in.read(); //讀取長度
if(len == -1) {
throw new IOException("socket closed by client");
}
int bytesRead = in.read(buffer, 0, len);
int totalBytesRead = 0;
while (totalBytesRead < len) {
totalBytesRead += bytesRead;
bytesRead = in.read(buffer, totalBytesRead, len - totalBytesRead);
}
//操作快取
byte[] response = Util.processRequest(RedisServer3.cache,buffer, totalBytesRead,true);
Util.log_debug("response:" + new String(response));
//寫回資料
OutputStream os = clientSocket.getOutputStream();
os.write(response);
os.flush();
}
} catch (IOException e) {
System.out.println("read or write data exception");
} finally {
try {
clientSocket.close();
Util.log_debug("socket closed");
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
使用這個方式,10個執行緒連續讀寫10000次,也就是累計訪問20000萬次只需要3s。
4.阻塞IO+單執行緒輪詢+多執行緒處理+長連線(不可行)
多執行緒和長連線大大提高了效率,但是如果連線數太多,那麼需要太多的執行緒,這樣肯定不可行。這樣大部分執行緒即使沒資料也不能幹其它的,就耗在這個連線上了。
我們可不可以讓一個執行緒去負責等待這些socket,有資料了就告訴工作執行緒池。
程式碼如下,加了一個執行緒遍歷已經連線的socket,然後如果socket.getInputStream().available() > 0就通知執行緒池。
這個程式有些情況下能正常工作,但是實際是有問題的,關鍵就在於上面的available函式是阻塞的,每次輪詢所有的socket,都需要挨個等待是否已經有資料了,所以就是序列。在java裡沒法對socket單獨設定非阻塞,必須從NIO才行,如果用C語言是可行的,但是這裡不行。
package org.ifool.niodemo.redis.redis4;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class RedisServer4 {
//全域性快取
public static Map<String,String> cache = new ConcurrentHashMap<String,String>();
//當前的socket
final public static Set<Socket> socketSet = new HashSet<Socket>(10);
public static void main(String[] args) throws IOException {
//用於處理請求的執行緒池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
ServerSocket serverSocket = new ServerSocket(8888,100);
//啟動一個執行緒用於一直掃描可以讀取資料的socket,並且去掉已經關閉的連線
Thread thread = new Thread(new Runnable() {
public void run() {
//找到可以讀取的socket,處理
while (true) {
synchronized (socketSet) {
Iterator<Socket> it = socketSet.iterator();
while(it.hasNext()) {
Socket socket = it.next();
if (socket.isConnected()) {
try {
if (!socket.isInputShutdown() && socket.getInputStream().available() > 0) {
it.remove();
threadPool.execute(new RequestHandler(socket));
}
} catch (IOException ex) {
System.out.println("socket already closed1");
socketSet.remove(socket);
try {
socket.close();
} catch (IOException e) {
System.out.println("socket already closed2");
}
}
} else {
socketSet.remove(socket);
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
});
thread.start();
while(true) {
//接受客戶端連線請求,把新建的socket加入socketset
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
Util.log_debug("client address:" + clientSocket.getRemoteSocketAddress().toString());
synchronized (socketSet) {
socketSet.add(clientSocket);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class RequestHandler implements Runnable{
private Socket clientSocket;
public RequestHandler(Socket socket) {
clientSocket = socket;
}
public void run() {
byte[] buffer = new byte[512];
//讀取資料並且操作快取,然後寫回資料
try {
//讀資料
InputStream in = clientSocket.getInputStream();
int len = in.read(); //讀取長度
if(len == -1) {
throw new IOException("socket closed by client");
}
int bytesRead = in.read(buffer, 0, len);
int totalBytesRead = 0;
while (totalBytesRead < len) {
totalBytesRead += bytesRead;
bytesRead = in.read(buffer, totalBytesRead, len - totalBytesRead);
}
//操作快取
byte[] response = Util.processRequest(RedisServer4.cache,buffer, totalBytesRead,true);
Util.log_debug("response:" + new String(response));
//寫回資料
OutputStream os = clientSocket.getOutputStream();
os.write(response);
os.flush();
synchronized (RedisServer4.socketSet) {
RedisServer4.socketSet.add(clientSocket);
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("read or write data exception");
} finally {
}
}
}
5.IO多路複用+單執行緒輪詢+多執行緒處理+長連線
在上述例子中我們試圖用普通socket實現類似select的功能,在Java裡是不可行的,必須用NIO。我們只需要一個select函式就能輪詢所有的連線是否準備好資料,準備好了就能呼叫執行緒池裡的執行緒處理。
要使用NIO,需要了解ByteBuffer, Channel等內容,比如ByteBuffer設計的就比較麻煩,此處不再展開。
客戶端程式碼暫時不用NIO,還是用原來的,服務端程式碼如下:
package org.ifool.niodemo.redis.redis5;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SyncFailedException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class RedisServer5 {
//全域性快取
public static Map<String,String> cache = new ConcurrentHashMap<String,String>();
public static void main(String[] args) throws IOException {
//用於處理請求的執行緒池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8888),1000);
Selector selector = Selector.open();
ssc.configureBlocking(false); //必須設定成非阻塞
ssc.register(selector, SelectionKey.OP_ACCEPT); //serverSocket只關心accept
while(true) {
int num = selector.select();
if(num == 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if(key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false); //設定成非阻塞才能監聽
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(512) );
System.out.println("new connection");
}
if(key.isReadable()) {
SocketChannel clientSocketChannel = (SocketChannel)key.channel();
//System.out.println("socket readable");
if(!clientSocketChannel.isConnected()) {
clientSocketChannel.finishConnect();
key.cancel();
clientSocketChannel.close();
System.out.println("socket closed2");
continue;
}
ByteBuffer buffer = (ByteBuffer)key.attachment();
int len = clientSocketChannel.read(buffer);
Socket socket = clientSocketChannel.socket();
if(len == -1) {
clientSocketChannel.finishConnect();
key.cancel();
clientSocketChannel.close();
System.out.println("socket closed1");
} else {
threadPool.execute(new RequestHandler(clientSocketChannel, buffer));
}
}
}
}
}
}
class RequestHandler implements Runnable{
private SocketChannel channel;
private ByteBuffer buffer;
public RequestHandler(SocketChannel channel, Object buffer) {
this.channel = channel;
this.buffer = (ByteBuffer)buffer;
}
public void run() {
//讀取資料並且操作快取,然後寫回資料
try {
int position = buffer.position();
//切換成讀模式,以便把第一個位元組到長度讀出來
buffer.flip();
int len = buffer.get(); //讀取長度
if(len > position + 1) {
buffer.position(position);
buffer.limit(buffer.capacity());
return;
}
byte[] data = new byte[len];
buffer.get(data,0,len);
//操作快取
byte[] response = Util.processRequest(RedisServer5.cache,data, len,true);
Util.log_debug("response:" + new String(response));
buffer.clear();
buffer.put(response);
buffer.flip();
channel.write(buffer);
buffer.clear();
} catch (IOException e) {
System.out.println("read or write data exception");
} finally {
}
}
}
自己寫NIO程式有很多坑,上面的程式碼有時候會出問題,有些異常沒處理好。但是10個執行緒不停寫10000次也是3s多。
IO多路複用+Netty
使用java的原生NIO寫程式很容易出問題,因為API比較複雜,而且有很多異常要處理,比如連線的關閉,粘包半包等,使用Netty這種成熟的框架會比較好寫。
Netty常用的執行緒模型如下圖所示,mainReactor負責監聽server socket,accept新連線,並將建立的socket分派給subReactor。subReactor負責多路分離已連線的socket,讀寫網路資料,對業務處理功能,其扔給worker執行緒池完成。通常,subReactor個數上可與CPU個數等同。
客戶端程式碼如下所示。其中的兩個NioEventLoop就是上面的mainReactor和subReactor。第一個引數為0,是使用預設執行緒數的意思,這樣mainReactor一般是1個,subReactor一般與CPU核相通。
我們這裡只有boss(mainReactor)和worker(subReactor),一般情況下,還有一個執行緒池,用於處理真正的業務邏輯,因為worker是用來讀取和解碼資料的,如果在這個worker裡處理業務邏輯,比如訪問資料庫,是不合適的。只是我們這個場景就類似於Redis,所以沒有用另一個執行緒池。
package org.ifool.niodemo.redis.redis6;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.ifool.niodemo.redis.Util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class RedisServer6 {
//全域性快取
public static Map<String,String> cache = new ConcurrentHashMap<String,String>();
public static void main(String[] args) throws IOException, InterruptedException {
//用於處理accept事件的執行緒池
EventLoopGroup bossGroup = new NioEventLoopGroup(0, new ThreadFactory() {
AtomicInteger index = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r,"netty-boss-"+index.getAndIncrement());
}
});
//用於處理讀事件的執行緒池
EventLoopGroup workerGroup = new NioEventLoopGroup(0, new ThreadFactory() {
AtomicInteger index = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r,"netty-worker-"+index.getAndIncrement());
}
});
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,50)
.childHandler(new ChildChannelHandler());
ChannelFuture future = bootstrap.bind(8888).sync();
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**這個類就是供netty-worker呼叫的**/
class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
//先通過一個LengthFieldBasedFrameDecoder分包,再傳給RequestHandler
socketChannel.pipeline()
.addLast(new RedisDecoder(127,0,1))
.addLast(new RequestHandler());
}
}
class RequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
int len = buf.readableBytes() - 1;
int lenField = buf.readByte();
if(len != lenField) {
ByteBuf resp = Unpooled.copiedBuffer("2|bad cmd".getBytes());
ctx.write(resp);
}
byte[] req = new byte[len];
buf.readBytes(req,0,len);
byte[] response = Util.processRequest(RedisServer6.cache,req,len,true);
ByteBuf resp = Unpooled.copiedBuffer(response);
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
class RedisDecoder extends LengthFieldBasedFrameDecoder {
public RedisDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
}
可以看到,執行後產生了一個boss執行緒和10個worker執行緒。
用netty寫的就比較穩定了,10個寫成不停寫10000次也是3秒,但是不用擔心執行緒數了。
總結
網路IO模型只看概念什麼的很難理解,只有通過例項才能理解的更深刻。我們通過回答一個問題來總結一下:
為什麼redis能通過單執行緒實現上萬的tps?
我們把redis處理請求的過程細化:
(1)讀取原始資料
(2)解析並處理資料(處理業務邏輯)
(3)寫會返回資料
讀取資料是通過IO多路複用實現,而在底層,是通過epoll實現,epoll相比於select(不是Java NIO的select,是Linux的select)主要有以下兩個優點:
一是提高了遍歷socket的效率,即時有上百萬個連線,它也只會遍歷有事件的連線,而select需要全部遍歷一遍。
二是通過mmap實現了核心態與使用者態的共享記憶體,也就是資料從網絡卡到達複製到核心空間,不需要複製到使用者空間了,所以使用epoll,如果發現有讀事件,那麼記憶體裡的資料也準備好了,不需要拷貝。
通過以上可以得出,讀取資料是十分快的。
接下來就是處理資料,這才是能使用單執行緒的本質原因。redis的業務邏輯是純記憶體操作,耗時是納秒級的,所以事件可以忽略不計。假如我們是一個複雜的web應用,業務邏輯涉及到讀資料庫,呼叫其它模組,那麼是不能用單執行緒的。
同樣,寫資料也是通過epoll共享記憶體,只要把結果計算後放到使用者記憶體,然後通知作業系統就可以了。
所以,redis能單執行緒支撐上萬tps的前提就是每個請求都是記憶體操作,事件都特別短,但凡有一次請求慢了,就會導致請求阻塞。假設99.99%的請求響應時間都在1ms以內,而0.01%的請求時間為1s,那麼單執行緒模型在處理1s請求的時候,剩餘1ms的請求也都得排隊