1. 程式人生 > >java 服務端多線線程非阻塞實現05

java 服務端多線線程非阻塞實現05

@param 緩沖區溢出 nbsp oca span temp class 二層 字符串

/**
 * 非阻塞IO多線線程服務端
 * 當一個任務進入多線程,這個任務線程需要處理接收信息、發送信息、因而發生I/O阻塞問題
 * 利用selector可以實現異步
 *
 */
public class EchoServer02 {
    //輪詢器,處理IO阻塞問題
    private Selector selector = null;
    private ServerSocketChannel serverSocketChannel = null;
    private int port = 8088;
    private Charset charset = Charset.forName("
GBK");//編碼方式 public EchoServer02() throws IOException{ //創建一個Selector對象 selector = Selector.open(); //這個方法沒有與任何本地端口綁定,並且處於阻塞模式; serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作於非阻塞模式
serverSocketChannel.configureBlocking(false);// IO異步處理 //把服務器與本地端口綁定 serverSocketChannel.socket().bind(new InetSocketAddress(port));//綁定服務器端口 System.out.println("服務器已啟動"); } public void service() throws Exception{ /*SeverSocketChannel或Socket類通過register()方法向Selector註冊事件時, register()方法會創建一個SelectionKey對象, 這個SelectionKey對象是用來跟蹤註冊事件的句柄。 在SelectionKey對象的有效時間,Selector會一直監控與SelectionKey對象相關的事件, 如果事件發生,就會把SelectionKey對象加入seleected-keys集合中。
*/ //將ServerSocketChannel註冊到Selector上 //只要ServerSocketChannel及SocketChannel向Selector註冊了特定的事件,Selector就會監控這些事件是否發送 //SelecitonKey.OP_ACCEPT:接收連接就緒事件,表示服務器監聽到了客戶連接,服務器可以接收這個鏈接了。常量值為16 //這個客戶SocketChannel會被Selector監控到 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //獲取Selector中的SelectionKey數量 while(selector.select() > 0){//第一層循環 //相關事件已經被Selector捕獲的SelectionKey的集合。 Set readKeys = selector.selectedKeys(); Iterator iterator = readKeys.iterator(); while(iterator.hasNext()){//第二層循環 SelectionKey key = null; try{//處理SelectionKey key = (SelectionKey)iterator.next();//取出一個SelectionKey //把SelectionKey從Selector的selected-集合中刪除 iterator.remove(); if(key.isAcceptable()){//處理連接就緒事件 //獲得與SelectionKey關聯的ServerSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); //獲得與客戶連接的SocketChannel,這個SocketChannel默認情況是阻塞的 SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("接收到客戶的連接,來自:"+socketChannel.socket().getInetAddress() +":"+socketChannel.socket().getPort()); //把SocketChannel設置為非阻塞模式, socketChannel.configureBlocking(false); //創建一個用於存放用戶發送來的數據的緩沖區 ByteBuffer buffer = ByteBuffer.allocate(1024); //把SocketChannel向Selector註冊就讀事件和就緒事件,且關聯了一個buffer附件 socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer); } if(key.isReadable()){//處理讀就緒事件 receive(key); } if(key.isWritable()){//處理寫就緒事件 send(key); } }catch (Exception e) { e.printStackTrace(); try { if(key != null){ //使這個Seleciton失效 //使得Selector不再監控這個SelectionKey感興趣的事件 key.cancel(); key.channel().close();//關閉這個SelectionKey關聯的SocketChannel } } catch (Exception e2) { e2.printStackTrace(); } } } } } public void send(SelectionKey key) throws IOException{ //獲得與SelectionKey關聯的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關聯的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); //把極限設為位置,把位置設為0 buffer.flip(); //按照GBK編碼,把buffer中的字節轉換為字符串 String data = decode(buffer); //如果還沒有讀到一行數據就返回 if(data.indexOf("\r\n") == -1){ return ; } //截取一行數據 String outputData = data.substring(0, data.indexOf("\n")+1); System.out.println(outputData); //把輸出的字符串安裝GBK編碼,轉換為字節,把它放入outputBuffer ByteBuffer outputBuffer = encode("ehco:" + outputData); //輸出outputBuffer中所有的字節 while(outputBuffer.hasRemaining()){ socketChannel.write(outputBuffer); //把outputData字符串按照GBK編碼,轉換為字節 ,把它放入ByteBuffer中 ByteBuffer temp = encode(outputData); //把buffer的位置設為temp的極限 buffer.position(temp.limit()); //刪除buffer中已經處理的數據 buffer.compact(); //如果已經輸出了字符串"bye\r\n",就使SelectionKey失效,並關閉SocketChannel if(outputData.equals("bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("關閉與客戶的連接"); } } } /** * receive()方法把讀入的數據都放在一個ByteBuffer中, * send()方法就從這個ByteBuffer中取出數據 * 如果ByteBuffer中還沒有一行字符串,就什麽不做,直接退出send()方法 * @param key * @throws IOException */ public void receive(SelectionKey key) throws IOException{ //獲得與SelectionKey關聯的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關聯的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); //創建一個ByteBuffer,用於存放讀到的數據 ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的極限設為容量 buffer.limit(buffer.capacity()); //把readBuff中內容拷貝到buffer中, //假設buffer容量足夠大,不會出現緩沖區溢出異常 buffer.put(readBuff); } public String decode(ByteBuffer buffer){//解碼,將字節轉換為字符串的過程 CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){//編碼,將字符串轉換為字節 return charset.encode(str); } public static void main(String[] args) throws Exception{ EchoServer02 server02 = new EchoServer02(); server02.service(); } }

java 服務端多線線程非阻塞實現05