1. 程式人生 > >Reactor模式簡單實現與理解

Reactor模式簡單實現與理解

Class Reactor:

/** 
 * 
 * 經典的網路服務在每個執行緒中完成對資料的處理:
 * 但這種模式在使用者負載增加時,效能將下降非常的快。
 * 系統執行的效能瓶頸通常在I/O讀寫,包括對埠和檔案的操作上,過去,在打 開一個I/O通道後,
 * read()將一直等待在埠一邊讀取位元組內容,如果沒有內容進來,read()也是傻傻的等,
 * 這會影響我們程式繼續做其他事情,那 麼改進做法就是開設執行緒,讓執行緒去等待,但是這樣做也是相當耗費資源(傳統socket通訊伺服器設計模式) 的。
 * 
 * Java NIO非堵塞技術實際是採取Reactor模式,或者說是Observer模式為我們監察I/O埠,
 * 如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個執行緒死等,從外界看,實現了流暢的I/O讀寫,不堵塞了。 
 * NIO 有一個主要的類Selector,這個類似一個觀察者 ,只要我們把需要探知的 socketchannel告訴Selector,
 * 我們接著做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些 Key,就會獲得我們剛剛註冊過的socketchannel,
 * 然後,我們從這個Channel中讀取資料,放心,包準能夠讀到,接著我們可以處理這些資料。 
 * Selector內部原理實際是在做一個對所註冊的channel的輪詢訪問 ,不斷的輪詢(目前就這一個演算法),一旦輪詢到一個channel有所註冊的事情發生,
 * 比如資料來了,他就會站起來報告,交出一把鑰匙,
 * 讓我們通過這把鑰匙(SelectionKey 表示 SelectableChannel 在 Selector 中的註冊的標記。 )來讀取這個channel的內容。 
 * 
 * 反應器模式 
 * 用於解決多使用者訪問併發問題 
 * 舉個例子:餐廳服務問題 
 * 傳統執行緒池做法:來一個客人(請求)去一個服務員(執行緒) 
 * 反應器模式做法:當客人點菜的時候,服務員就可以去招呼其他客人了,等客人點好了菜,直接招呼一聲“服務員” 
 */
public class Reactor implements Runnable{ //同步事件分離器,阻塞等待Handles中的事件發生 public final Selector selector; public final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException{ selector=Selector.open(); serverSocketChannel=ServerSocketChannel.open(); InetSocketAddress inetSocketAddress=new
InetSocketAddress(InetAddress.getLocalHost(),port); serverSocketChannel.socket().bind(inetSocketAddress); /* * ServerSocketChannel可以設定成非阻塞模式。 * 在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。 * 因此,需要檢查返回的SocketChannel是否是null. */ serverSocketChannel.configureBlocking(false
); /* * 向selector註冊該serverSocketChannel * SelectionKey.OP_ACCEPT —— 接收連線繼續事件,表示伺服器監聽到了客戶連線,伺服器可以接收這個連線了 * SelectionKey.OP_CONNECT —— 連線就緒事件,表示客戶與伺服器的連線已經建立成功 * SelectionKey.OP_READ —— 讀就緒事件,表示通道中已經有了可讀的資料,可以執行讀操作了(通道目前有資料,可以進行讀操作了) * SelectionKey.OP_WRITE —— 寫就緒事件,表示已經可以向通道寫資料了(通道目前可以用於寫操作) * 這裡 注意,下面兩種,SelectionKey.OP_READ ,SelectionKey.OP_WRITE , * 1.當向通道中註冊SelectionKey.OP_READ事件後,如果客戶端有向快取中write資料,下次輪詢時,則會 isReadable()=true; * 2.當向通道中註冊SelectionKey.OP_WRITE事件後,這時你會發現當前輪詢執行緒中isWritable()一直為ture,如果不設定為其他事件 */ SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); /* * 利用selectionKey的attache功能繫結Acceptor 如果有事情,觸發Acceptor * 該selectionKey為serverSocketChannel的selectionKey * attach的為new Acceptor(this) * 用於void dispatch(SelectionKey key)中獲取key.attachment() * 將被本類中的run()方法的selectionKeys.clear(); 清空 * 第二次的selector.selectedKeys();獲取到的將會是socketChannel註冊的OP_READ的selectionKey(attach的為SocketReadHandler) */ selectionKey.attach(new Acceptor(this)); } @Override public void run() { try { while(!Thread.interrupted()){ selector.select(); Set<SelectionKey> selectionKeys= selector.selectedKeys(); Iterator<SelectionKey> it=selectionKeys.iterator(); //Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。 while(it.hasNext()){ SelectionKey selectionKey=it.next(); /* * 第一次觸發此方法,獲取(OP_ACCEPT)selectionKey.attachment()為new Acceptor(this) * Acceptor run()方法裡面為 new SocketReadHandler(reactor.selector, socketChannel); * 在SocketReadHandler構造方法中將socketChannel register到Selector,返回selectionKey * 再將該socketChannel的selectionKey attach(this); this represent new出來的SocketReadHandler * * 第二次觸發此方法,獲取(OP_READ)selectionKey.attachment()為new出來的SocketReadHandler * SocketReadHandler run()方法裡面為 socketChannel.read(inputBuffer); 實際處理的邏輯程式碼 */ dispatch(selectionKey); /* * selectionKeys.clear(); 將selectionKeys清空, * Acceptor類中的run()>>>new SocketReadHandler()構造方法中的 selector.wakeup()>>>再次觸發selector.select(); * Set<SelectionKey> selectionKeys= selector.selectedKeys(); * 第一次遍歷的selectionKeys裡面只有一個就是OP_ACCEPT的selectionKey,attachment為Acceptor物件 * 第二次遍歷的selectionKeys裡面只有一個就是OP_READ的selectionKey,attachment為SocketReadHandler物件 */ selectionKeys.clear(); } } } catch (IOException e) { e.printStackTrace(); } } /** * 執行Acceptor或SocketReadHandler */ void dispatch(SelectionKey key) { //本例第一次此方法執行key為serverSocketChannel註冊的selectionKey,key.attachment()為Acceptor物件 //本例第二次此方法執行key為socketChannel註冊的selectionKey,key.attachment()為SocketReadHandler物件 Runnable r = (Runnable)(key.attachment()); if (r != null){ /* * 第一次執行Acceptor的run(),run()方法將呼叫SocketReadHandler構造方法 * 在SocketReadHandler構造方法中將向selector註冊socketChannel,並attach(SocketReadHandler物件) * 第二次執行SocketReadHandler的run(),處理具體邏輯程式碼 */ r.run(); } } }

Class Acceptor:

public class Acceptor implements Runnable{  
    private Reactor reactor;  
    public Acceptor(Reactor reactor){  
        this.reactor=reactor;  
    }  
    @Override  
    public void run() {  
        try {  
            /*
             * ServerSocketChannel可以設定成非阻塞模式。
             * 在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。
             * 因此,需要檢查返回的SocketChannel是否是null.
             */
            SocketChannel socketChannel=reactor.serverSocketChannel.accept(); 

            /*
             * 呼叫Handler來處理channel
             * 在SocketReadHandler構造方法中將socketChannel register到Selector,返回selectionKey
             * 再將該socketChannel的selectionKey attach(this); this represent new出來的SocketReadHandler
             */
            if(socketChannel!=null) new SocketReadHandler(reactor.selector, socketChannel);  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}

Class SocketReadHandler :

public class SocketReadHandler implements Runnable{  
    private SocketChannel socketChannel;  
    public SocketReadHandler(Selector selector,
            SocketChannel socketChannel) throws IOException{  
        this.socketChannel=socketChannel;  
        socketChannel.configureBlocking(false);  

        SelectionKey selectionKey=socketChannel.register(selector, 0);  

        //將該socketChannel註冊的SelectionKey繫結為本SocketReadHandler 
        //下一步有事件觸發時,將呼叫本類的run方法。    
        //參看dispatch(SelectionKey key)    
        selectionKey.attach(this);  

        //同時將SelectionKey標記為可讀,以便讀取。    
        selectionKey.interestOps(SelectionKey.OP_READ);    
        selector.wakeup();  
    }  

    /** 
     * 處理讀取資料 
     */  
    @Override  
    public void run() {  
        ByteBuffer inputBuffer=ByteBuffer.allocate(1024);  
        inputBuffer.clear();  
        try {  
            socketChannel.read(inputBuffer);  
            //啟用執行緒池 處理這些request  
            //requestHandle(new Request(socket,btt));   
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}