1. 程式人生 > >java 7之AIO對完成埠(IOCP)的實現

java 7之AIO對完成埠(IOCP)的實現

鑑於網上關於java7之AIO(只能在windows 作業系統下才能實現功能)實現的例項少而且講述的不夠詳細,特此寫這篇部落格供大家借鑑,希望能幫助和我一樣苦逼的java程式設計師們,也希望高手能指出部落格描述不當之處

廢話少說,直接進入主題:先貼段伺服器程式碼,程式碼沒進行封裝,可能比較亂

public class AIOServer {
     public final static int PORT = 9888;   //埠號
     private AsynchronousServerSocketChannel server; //此類相當一一個伺服器的SOCKET,不過它的實現是非同步通訊形式
     public Map<String, List<AsynchronousSocketChannel>> map=
                                         new HashMap<String, List<AsynchronousSocketChannel>>(); 

    public AIOServer() throws IOException {
            //   AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 1);
//         server = AsynchronousServerSocketChannel.open(group);
//         //通過setOption配置Socket
//         server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//         server.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
//         //繫結到指定的主機,埠
//         server.bind(new InetSocketAddress("寫一個Ip地址", PORT)); 

           //上面註釋掉的寫法,經我測試,會報IO異常

           server = AsynchronousServerSocketChannel.open().bind(
                          new InetSocketAddress("192.168.1.100",PORT)); 

     }
       
    public void startWithCompletionHandler() throws InterruptedException,
                         ExecutionException, TimeoutException {
               System.out.println("Server listen on " + PORT);


                /*****************************************************
                 * 主要功能:註冊事件和事件完成後的處理器
                 * 實現原理:當一有使用者連線到伺服器時,會呼叫accept方法,
                 * 此方法是一個非阻塞方法並和一個完成埠(即處理器-CompletionHandler)進行繫結的方法
                 * 當用戶連線成功時,完成埠會自動呼叫completed方法,這步由作業系統完成
                 * 要實現能連線多使用者,必須在completed方法中在迴圈呼叫一次accept方法
                 * 程式碼如下:server.accept(null, this);
                 * 關於IOCP的詳細實現原理參見C++中的IOCP
                 *****************************************************/              

               server.accept(null,new CompletionHandler<AsynchronousSocketChannel, Object>() {

                          //ByteBuffer:接收資料的緩衝區,這裡初始化大小為65535
                          ByteBuffer buffer = ByteBuffer.allocate(65535);

                         
                          @Override
                          public void completed(AsynchronousSocketChannel result,Object attachment) {

                                       //AsynchronousSocketChannel相當唯一標示客戶的socket
                                       //這裡需要加個final關鍵字才能讓SocketChannel物件在方法read中可見
                                       final AsynchronousSocketChannel SocketChannel=result;

                                      //再次向處理器投遞一個連線請求
                                       server.accept(null, this);
      
                                try {

                                        //清空緩衝區,這步不能省
                                        buffer.clear();

                                        result.read(buffer, null, new CompletionHandler<Integer, Object>() {

                                                @Override
                                                public void completed(Integer result1,Object result2) {
                                                               System.out.println("result1:"+result1);
           
                                                                 if(result1==-1){

                                                                       //這裡面可進行對失去連線的客戶端進行刪除操作
                                                                  } 
                                                 
                                                                 if(result1!=-1){
                                                                         buffer.flip();

                                                                         //接收到的資料緩衝區轉位元組數,此後可對這個陣列進行操作
                                                                 }

                                                   //完成接收操作之後,必須清空緩衝區,不然會出現死迴圈
                                                  buffer.clear();

                                                  //再次向處理器投遞一個接收資料請求
                                                  SocketChannel.read(buffer, null, this);
                        
                                                 }
         

                                                @Override
                                                public void failed(Throwable exc,Object result2) {
                                                         exc.printStackTrace();
                                                         System.out.println("failed: " + exc);

                                                             //完成接收操作之後,必須清空緩衝區,不然會出現死迴圈
                                                             buffer.clear();

                                                            //再次向處理器投遞一個接收資料請求
                                                            SocketChannel.read(buffer, null, this);


                                                 }
         
                                        });
        
       
        
                             }catch (Exception e) {
                                       e.printStackTrace();
                             }finally {

                              }
                      }

                     @Override
                    public void failed(Throwable exc, Object attachment) {
                              exc.printStackTrace();
                              System.out.println("failed: " + exc);

                             //再次向處理器投遞一個連線請求
                                       server.accept(null, this);
                       }
              });
            // 這裡必須 保證主執行緒的存活

             System.in.read();

     }

    public static void main(String args[]) throws Exception {
                new AIOServer().startWithCompletionHandler();
     }
}

注:置於伺服器往客戶端傳送資料操作,可把連線成功之後的客戶端AsynchronousSocketChannel result儲存到一個List集合中,再在接收使用者資料操作中另開一個執行緒,佔門處理髮送事件,也開獨立開一個執行緒去監控是否有客戶端傳送資料過來,然後對list進行遍歷群發,置於實現分組傳送,描述比較麻煩,就不詳述,傳送資料的方法是result.write();write有好幾種過載方式,詳細見官網api,api地址http://docs.oracle.com/javase/7/docs/api/