Reactor模式簡單實現與理解
阿新 • • 發佈:2019-01-07
Class Reactor:
/**
*
* 經典的網路服務在每個執行緒中完成對資料的處理:
* 但這種模式在使用者負載增加時,效能將下降非常的快。
* 系統執行的效能瓶頸通常在I/O讀寫,包括對埠和檔案的操作上,過去,在打 開一個I/O通道後,
* read()將一直等待在埠一邊讀取位元組內容,如果沒有內容進來,read()也是傻傻的等,
* 這會影響我們程式繼續做其他事情,那 麼改進做法就是開設執行緒,讓執行緒去等待,但是這樣做也是相當耗費資源(傳統socket通訊伺服器設計模式) 的。
*
* Java NIO非堵塞技術實際是採取Reactor模式,或者說是Observer模式為我們監察I/O埠,
* 如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個執行緒死等,從外界看,實現了流暢的I/O讀寫,不堵塞了。
* NIO 有一個主要的類Selector,這個類似一個觀察者 ,只要我們把需要探知的 socketchannel告訴Selector,
* 我們接著做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些 Key,就會獲得我們剛剛註冊過的socketchannel,
* 然後,我們從這個Channel中讀取資料,放心,包準能夠讀到,接著我們可以處理這些資料。
* Selector內部原理實際是在做一個對所註冊的channel的輪詢訪問 ,不斷的輪詢(目前就這一個演算法),一旦輪詢到一個channel有所註冊的事情發生,
* 比如資料來了,他就會站起來報告,交出一把鑰匙,
* 讓我們通過這把鑰匙(SelectionKey 表示 SelectableChannel 在 Selector 中的註冊的標記。 )來讀取這個channel的內容。
*
* 反應器模式
* 用於解決多使用者訪問併發問題
* 舉個例子:餐廳服務問題
* 傳統執行緒池做法:來一個客人(請求)去一個服務員(執行緒)
* 反應器模式做法:當客人點菜的時候,服務員就可以去招呼其他客人了,等客人點好了菜,直接招呼一聲“服務員”
*/
public class Reactor implements Runnable{
//同步事件分離器,阻塞等待Handles中的事件發生
public final Selector selector;
public final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException{
selector=Selector.open();
serverSocketChannel=ServerSocketChannel.open();
InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);
serverSocketChannel.socket().bind(inetSocketAddress);
/*
* ServerSocketChannel可以設定成非阻塞模式。
* 在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。
* 因此,需要檢查返回的SocketChannel是否是null.
*/
serverSocketChannel.configureBlocking(false );
/*
* 向selector註冊該serverSocketChannel
* SelectionKey.OP_ACCEPT —— 接收連線繼續事件,表示伺服器監聽到了客戶連線,伺服器可以接收這個連線了
* SelectionKey.OP_CONNECT —— 連線就緒事件,表示客戶與伺服器的連線已經建立成功
* SelectionKey.OP_READ —— 讀就緒事件,表示通道中已經有了可讀的資料,可以執行讀操作了(通道目前有資料,可以進行讀操作了)
* SelectionKey.OP_WRITE —— 寫就緒事件,表示已經可以向通道寫資料了(通道目前可以用於寫操作)
* 這裡 注意,下面兩種,SelectionKey.OP_READ ,SelectionKey.OP_WRITE ,
* 1.當向通道中註冊SelectionKey.OP_READ事件後,如果客戶端有向快取中write資料,下次輪詢時,則會 isReadable()=true;
* 2.當向通道中註冊SelectionKey.OP_WRITE事件後,這時你會發現當前輪詢執行緒中isWritable()一直為ture,如果不設定為其他事件
*/
SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
/*
* 利用selectionKey的attache功能繫結Acceptor 如果有事情,觸發Acceptor
* 該selectionKey為serverSocketChannel的selectionKey
* attach的為new Acceptor(this)
* 用於void dispatch(SelectionKey key)中獲取key.attachment()
* 將被本類中的run()方法的selectionKeys.clear(); 清空
* 第二次的selector.selectedKeys();獲取到的將會是socketChannel註冊的OP_READ的selectionKey(attach的為SocketReadHandler)
*/
selectionKey.attach(new Acceptor(this));
}
@Override
public void run() {
try {
while(!Thread.interrupted()){
selector.select();
Set<SelectionKey> selectionKeys= selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
//Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。
while(it.hasNext()){
SelectionKey selectionKey=it.next();
/*
* 第一次觸發此方法,獲取(OP_ACCEPT)selectionKey.attachment()為new Acceptor(this)
* Acceptor run()方法裡面為 new SocketReadHandler(reactor.selector, socketChannel);
* 在SocketReadHandler構造方法中將socketChannel register到Selector,返回selectionKey
* 再將該socketChannel的selectionKey attach(this); this represent new出來的SocketReadHandler
*
* 第二次觸發此方法,獲取(OP_READ)selectionKey.attachment()為new出來的SocketReadHandler
* SocketReadHandler run()方法裡面為 socketChannel.read(inputBuffer); 實際處理的邏輯程式碼
*/
dispatch(selectionKey);
/*
* selectionKeys.clear(); 將selectionKeys清空,
* Acceptor類中的run()>>>new SocketReadHandler()構造方法中的 selector.wakeup()>>>再次觸發selector.select();
* Set<SelectionKey> selectionKeys= selector.selectedKeys();
* 第一次遍歷的selectionKeys裡面只有一個就是OP_ACCEPT的selectionKey,attachment為Acceptor物件
* 第二次遍歷的selectionKeys裡面只有一個就是OP_READ的selectionKey,attachment為SocketReadHandler物件
*/
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 執行Acceptor或SocketReadHandler
*/
void dispatch(SelectionKey key) {
//本例第一次此方法執行key為serverSocketChannel註冊的selectionKey,key.attachment()為Acceptor物件
//本例第二次此方法執行key為socketChannel註冊的selectionKey,key.attachment()為SocketReadHandler物件
Runnable r = (Runnable)(key.attachment());
if (r != null){
/*
* 第一次執行Acceptor的run(),run()方法將呼叫SocketReadHandler構造方法
* 在SocketReadHandler構造方法中將向selector註冊socketChannel,並attach(SocketReadHandler物件)
* 第二次執行SocketReadHandler的run(),處理具體邏輯程式碼
*/
r.run();
}
}
}
Class Acceptor:
public class Acceptor implements Runnable{
private Reactor reactor;
public Acceptor(Reactor reactor){
this.reactor=reactor;
}
@Override
public void run() {
try {
/*
* ServerSocketChannel可以設定成非阻塞模式。
* 在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。
* 因此,需要檢查返回的SocketChannel是否是null.
*/
SocketChannel socketChannel=reactor.serverSocketChannel.accept();
/*
* 呼叫Handler來處理channel
* 在SocketReadHandler構造方法中將socketChannel register到Selector,返回selectionKey
* 再將該socketChannel的selectionKey attach(this); this represent new出來的SocketReadHandler
*/
if(socketChannel!=null) new SocketReadHandler(reactor.selector, socketChannel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Class SocketReadHandler :
public class SocketReadHandler implements Runnable{
private SocketChannel socketChannel;
public SocketReadHandler(Selector selector,
SocketChannel socketChannel) throws IOException{
this.socketChannel=socketChannel;
socketChannel.configureBlocking(false);
SelectionKey selectionKey=socketChannel.register(selector, 0);
//將該socketChannel註冊的SelectionKey繫結為本SocketReadHandler
//下一步有事件觸發時,將呼叫本類的run方法。
//參看dispatch(SelectionKey key)
selectionKey.attach(this);
//同時將SelectionKey標記為可讀,以便讀取。
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
/**
* 處理讀取資料
*/
@Override
public void run() {
ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
inputBuffer.clear();
try {
socketChannel.read(inputBuffer);
//啟用執行緒池 處理這些request
//requestHandle(new Request(socket,btt));
} catch (IOException e) {
e.printStackTrace();
}
}
}