基於AIO的CS聊天室
阿新 • • 發佈:2018-11-23
所謂AIO,即是非同步IO,它的IO操作交由作業系統完成。設定監聽器(類似於一個訊號處理函式),當系統IO操作完成時,會被監聽器監聽到,並執行相應的後續操作,然後返回。
監聽器一般使用CompletionHandler。
伺服器端程式碼:
package com.nanhao.AIOTest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.*; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { static final int PORT = 3000; final static String UTF_8 = "utf-8"; static List<AsynchronousSocketChannel>channelList = new ArrayList<>(); public void startListen()throws IOException,Exception{ //建立一個執行緒池 ExecutorService executorService = Executors.newFixedThreadPool(20); //以指定的執行緒池來建立一個AsynchronousChannelGroup AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executorService); //通過channelGroup來建立一個AsynchronousServerSocketChannel AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup) //指定監聽本機的埠 .bind(new InetSocketAddress("127.0.0.1",PORT)); //使用監聽器來接收來自客戶端的連結請求 serverSocketChannel.accept(null,new AcceptHandler(serverSocketChannel)); } public static void main(String[]args)throws Exception{ Server server = new Server(); server.startListen(); } class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{ private AsynchronousServerSocketChannel serverSocketChannel; public AcceptHandler(AsynchronousServerSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel; } //定義一個ByteBuffer準備讀取資料 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); @Override public void completed(final AsynchronousSocketChannel result, Object attachment) { Server.channelList.add(result); serverSocketChannel.accept(null,this); result.read(byteBuffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { byteBuffer.flip(); //將buffer中的位元組轉換為字元 String context = StandardCharsets.UTF_8.decode(byteBuffer).toString(); //由於是聊天室,所以將所有的channel裡面寫入這個訊息 for(AsynchronousSocketChannel ass:Server.channelList){ try{ ass.write(ByteBuffer.wrap(context.getBytes(Server.UTF_8))).get(); }catch(Exception e){ e.printStackTrace(); } } byteBuffer.clear(); result.read(byteBuffer,null,this); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("讀取資料失敗: "+exc); Server.channelList.remove(result); } }); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("連線失敗 :"+exc); } } }