java網路程式設計(三)----同步非阻塞nio及reactor模型
很多剛接觸NIO的人,第一眼看到的就是Java相對晦澀的API,比如:Channel,Selector,Socket什麼的;然後就是一坨上百行的程式碼來演示NIO的服務端Demo,所以這裡我們人性化地簡單介紹一下。
NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。
NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。
新增的著兩種通道都支援阻塞和非阻塞兩種模式。阻塞模式使用就像傳統中的支援一樣,比較簡單,但是效能和可靠性都不好;非阻塞模式正好與之相反。
對於低負載、低併發的應用程式,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網路)應用,應使用NIO的非阻塞模式來開發。
java nio基礎實現
緩衝區 Buffer
Buffer是一個物件,包含一些要寫入或者讀出的資料。
在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的;在寫入資料時,也是寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。
緩衝區實際上是一個數組,並提供了對資料結構化訪問以及維護讀寫位置等資訊。
具體的快取區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的介面:Buffer。通道 Channel
我們對資料的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。
底層的作業系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的對映底層作業系統的API。
Channel主要分兩大類:- SelectableChannel:使用者網路讀寫
- FileChannel:用於檔案操作
後面程式碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。
多路複用器 Selector
Selector是Java NIO 程式設計的基礎。
Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。一個Selector可以同時輪詢多個Channel,只需要一個執行緒負責Selector的輪詢,就可以接入成千上萬的客戶端。
nio使用
回憶BIO模型,之所以需要多執行緒,是因為在進行I/O操作的時候,一是沒有辦法知道到底能不能寫、能不能讀,只能”傻等”,即使通過各種估算,算出來作業系統沒有能力進行讀寫,也沒法在socket.read()和socket.write()函式中返回,這兩個函式無法進行有效的中斷。所以除了多開執行緒另起爐灶,沒有好的辦法利用CPU。
NIO的讀寫函式可以立刻返回,這就給了我們不開執行緒利用CPU的最好機會:如果一個連線不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連線(channel)繼續進行讀寫。
思考一下在Socket網路通訊中有什麼事件:
- 在伺服器端我們需要接收客戶端連線,這裡就有一個接收“Accept”事件;
- 而客戶端連線伺服器,連線有一個“Connect”事件
- 各自進行讀操作時,有一個“read”事件
- 各自進行寫操作時,有一個“write”事件
而上面我們說到連線被抽象為了Channel,這時我們就可以在多路複用器Selector上註冊通道和事件。
- 在伺服器端:ServerSocketChannel.register(Selector, SelectionKey.OP_ACCEPT);
將ServerSocketChannel註冊在Selector並繫結SelectionKey.OP_ACCEPT事件; - 呼叫多路複用器Selector.select();
- 如果channel已經發生了接收客戶端的事件,那麼就能被多路複用器select到,然後獲取到和客戶端連線的Socket,再將這個socket的讀寫事件註冊到Selector上。
- 同樣可以客戶端也是將其他事件註冊到Selector,然後事件被觸發後就可以被select()函式找到
- 最後程式不斷輪詢selector,根據select到的不同事件型別呼叫對應的handler進行處理。
select()函式呼叫的是系統底層函式,在Linux 2.6之前是select、poll,2.6之後是epoll,Windows是IOCP。
虛擬碼如下:
interface ChannelHandler{
void channelReadable(Channel channel);
void channelWritable(Channel channel);
}
class Channel{
Socket socket;
Event event;//讀,寫或者連線
}
//IO執行緒主迴圈:
class IoThread extends Thread{
public void run(){
Channel channel;
//選擇就緒的事件和對應的連線
while(channel=Selector.select()){
if(channel.event==accept){
//觸發了accept事件的是新連線,
//我們需要為這個新連線註冊讀寫事件
registerNewChannelHandler(channel);
}
if(channel.event==write){
//如果可以寫,則執行寫事件
getChannelHandler(channel).channelWritable(channel);
}
if(channel.event==read){
//如果可以讀,則執行讀事件
getChannelHandler(channel).channelReadable(channel);
}
}
}
//所有channel的對應事件處理器
Map<Channel,ChannelHandler> handlerMap;
}
注意,select是阻塞的,無論是通過作業系統的通知(epoll)還是不停的輪詢(select,poll),這個函式是阻塞的。所以你可以放心大膽地在一個while(true)裡面呼叫這個函式而不用擔心CPU空轉。同時我們也可以通過設定週期的方式呼叫Selector.select(1000)(每一秒select一次)。
實現nio的完整程式碼在最後
nio與bio對比
所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函式,分為等待系統可讀和真正的讀;同理,寫函式分為等待網絡卡可以寫和真正的寫。
需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”,而且這個過程非常快,屬於memory copy,頻寬通常在1GB/s級別以上,可以理解為基本不耗時。
下圖是幾種常見I/O模型的對比:
java的nio便是第二或者第三種形式:
- 第二種:在一個while迴圈裡面不斷每隔一秒檢查selector裡面是否有事件被觸發Selector.select(1000)
- 第三種:Selector.select()會檢查是是否有就緒事件,沒有的話會阻塞直到有就緒事件函式才會返回。
Reactor
圖解:
1. Handle:在網路程式設計中,這裡一般指Socket Handle,即一個網路連線(Connection,在Java NIO中的Channel)。這個Channel註冊到Synchronous Event Demultiplexer中,以監聽Handle中發生的事件,對ServerSocketChannnel可以是CONNECT事件,對SocketChannel可以是READ、WRITE、CLOSE事件等。
2. Synchronous Event Demultiplexer:多路複用器,阻塞等待一系列的Handle中的事件到來,如果阻塞等待返回,即表示在返回的Handle中可以不阻塞的執行返回的事件型別。這個模組一般使用作業系統的select來實現。在Java NIO中用Selector來封裝,當Selector.select()返回時,可以呼叫Selector的selectedKeys()方法獲取Set,一個SelectionKey表達一個有事件發生的Channel以及該Channel上的事件型別。
3. Initiation Dispatcher:Reactor模式的主要模組,事件排程器,通常被稱為reactor,用於管理Event Handler,即EventHandler的容器,用以註冊、移除EventHandler等;另外,它還作為Reactor模式的入口呼叫,在這個模組裡面呼叫Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,當阻塞等待返回時,根據事件發生的Handle將其分發給對應的Event Handler處理,即回撥EventHandler中的handle_event()方法。
4. 定義事件處理方法:handle_event(),以供InitiationDispatcher回撥使用。
需要注意的是:在java nio中我們註冊了建立連線相關的“Accept”“Connect”事件後,再進行讀寫操作要將讀寫事件再註冊到多路複用器,下次讀寫事件發生時才會被select到。這就是傳說中的多路複用IO。
另外Reactor翻譯為“反應”器,名字中”反應“的由來:
“反應”即“倒置”,“控制逆轉”
具體事件處理程式不呼叫反應器,而是由反應器分配一個具體事件處理程式,具體事件處理程式對某個指定的事件發生做出反應;這種控制逆轉又稱為“好萊塢法則”(不要呼叫我,讓我來呼叫你)
nio實現程式碼
一、server端程式碼
public class Server {
private static synchronized void start(int port){
ServerHandle serverHandle = new ServerHandle(port);
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args){
start(12345);
}
}
server處理類ServerHandle:
class ServerHandle implements Runnable {
private Selector m_Selector;
private ServerSocketChannel m_ServerChannel;
private volatile boolean m_Started;
public ServerHandle(int vPort) {
try {
m_Selector = Selector.open();
//伺服器監聽通道
ServerSocketChannel tServerChannel = ServerSocketChannel.open();
//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
tServerChannel.configureBlocking(false);
tServerChannel.bind(new InetSocketAddress(vPort), 1024);
//註冊伺服器通道到selector,監聽ACCEPT事件
tServerChannel.register(m_Selector, SelectionKey.OP_ACCEPT);
m_Started = true;
System.out.println("伺服器已啟動,埠號:" + vPort);
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop() {
m_Started = false;
}
@Override
public void run() {
while (m_Started) {
try {
//無論是否有讀寫事件發生,selector在1s後被喚醒一次
m_Selector.select(1000);
//或者通過下面這種方式:
//會阻塞直到selector裡面有就緒事件
//m_Selector.select();
Set<SelectionKey> tKeys = m_Selector.selectedKeys();
Iterator<SelectionKey> tIterator = tKeys.iterator();
SelectionKey tKey;
//輪詢
while (tIterator.hasNext()) {
tKey = tIterator.next();
tIterator.remove();
try {
handleInput(tKey);
} catch (IOException e) {
if (null != tKey) {
tKey.cancel();
if (null == tKey.channel()) {
tKey.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//完成後關閉select
//關閉Selector會自動關閉裡面的資源
if (null != m_Selector) {
try {
m_Selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey vKey) throws IOException {
if (vKey.isValid()) {
if (vKey.isAcceptable()) {
ServerSocketChannel tServerChannel = (ServerSocketChannel) vKey.channel();
SocketChannel tSocketChannel = tServerChannel.accept();
tSocketChannel.configureBlocking(false);
//拿到新連線註冊讀事件
tSocketChannel.register(m_Selector, SelectionKey.OP_READ);
}
}
//讀訊息
if (vKey.isReadable()) {
SocketChannel tSocketChannel = (SocketChannel) vKey.channel();
//得用buffer
ByteBuffer tBuffer = ByteBuffer.allocate(1024);
int tReadBytes = tSocketChannel.read(tBuffer);
if (0 < tReadBytes) {
tBuffer.flip();
byte[] tBytes = new byte[tBuffer.remaining()];
tBuffer.get(tBytes);
String tExpression = new String(tBytes, "UTF-8");
System.out.println("伺服器收到訊息:" + tExpression);
String tResult = "這句話是伺服器發過來的";
doWrite(tSocketChannel, tResult);
}
//讀取不到訊息關閉資源
else if (0 > tReadBytes) {
//即使key被cancel了他的channel還在的
vKey.cancel();
tSocketChannel.close();
}
}
}
//傳送資料
private void doWrite(SocketChannel vChannel, String vResponse) throws IOException {
byte[] tBytes = vResponse.getBytes("UTF-8");
ByteBuffer tWriteBuffer = ByteBuffer.allocate(tBytes.length);
tWriteBuffer.put(tBytes);
tWriteBuffer.flip();
vChannel.write(tWriteBuffer);
}
}
二、客戶端程式碼:
public class Client {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static ClientHandle clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(null != clientHandle) clientHandle.stop();
clientHandle = new ClientHandle(ip,port);
new Thread(clientHandle,"Server").start();
}
//向伺服器傳送訊息
public static boolean sendMsg(String msg) throws Exception{
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args) throws Exception {
start();
while(Client.sendMsg(new Scanner(System.in).nextLine()));
}
}
客戶端處理類ServerHandle:
class ClientHandle implements Runnable {
private String m_Host;
private int m_Port;
private Selector m_Selector;
private SocketChannel m_SocketChannel;
private volatile boolean m_Started;
ClientHandle(String vIp, int vPort) {
m_Host = vIp;
m_Port = vPort;
try {
m_Selector = Selector.open();
m_SocketChannel = SocketChannel.open();
m_SocketChannel.configureBlocking(false);
m_Started = true;
} catch (IOException e) {
e.printStackTrace();
}
}
void stop() {
m_Started = false;
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
System.out.println("客戶端連線失敗");
}
while (m_Started) {
try {
m_Selector.select(1000);
Set<SelectionKey> tKeys = m_Selector.selectedKeys();
Iterator<SelectionKey> tIterator = tKeys.iterator();
SelectionKey tKey;
while (tIterator.hasNext()) {
tKey = tIterator.next();
tIterator.remove();
try {
handleInput(tKey);
} catch (IOException e) {
if (null != tKey) {
tKey.cancel();
if (null != tKey.channel()) {
tKey.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//關閉selector會關閉它裡面的所有資源
if (null != m_Selector) {
try {
m_Selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey vKey) throws IOException {
if (vKey.isValid()) {
SocketChannel socketChannel = (SocketChannel) vKey.channel();
//如果是連線事件
if (vKey.isConnectable()) {
if (socketChannel.finishConnect())
System.out.println("客戶端連線上伺服器端");
else System.exit(1);
}
//讀訊息
if (vKey.isReadable()) {
//建立ByteBuffer,並開闢一個1M的緩衝區
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取請求碼流,返回讀取到的位元組數
int readBytes = socketChannel.read(buffer);
//讀取到位元組,對位元組進行編解碼
if (0 < readBytes) {
//將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作
buffer.flip();
//根據緩衝區可讀位元組數建立位元組陣列
byte[] bytes = new byte[buffer.remaining()];
//將緩衝區可讀位元組陣列複製到新建的陣列中
buffer.get(bytes);
String result = new String(bytes, "UTF-8");
System.out.println("客戶端收到訊息:" + result);
}
//鏈路已經關閉,釋放資源
else if (0 > readBytes) {
vKey.cancel();
socketChannel.close();
}
}
}
}
private void doWrite(SocketChannel channel, String request) throws IOException {
//將訊息編碼為位元組陣列
byte[] bytes = request.getBytes();
//根據陣列容量建立ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//將位元組陣列複製到緩衝區
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//傳送緩衝區的位元組陣列
channel.write(writeBuffer);
}
private void doConnect() throws IOException {
//客戶端請求連線上伺服器,並註冊連線事件
m_SocketChannel.connect(new InetSocketAddress(m_Host, m_Port));
m_SocketChannel.register(m_Selector, SelectionKey.OP_CONNECT);
}
void sendMsg(String msg) throws Exception {
//傳送了資料的話就註冊讀事件
m_SocketChannel.register(m_Selector, SelectionKey.OP_READ);
doWrite(m_SocketChannel, msg);
}
}