NIO實現非阻塞式Socket通訊
阿新 • • 發佈:2018-12-24
Selector 非阻塞通訊核心
- Selector負責監控所有已經註冊的Channel
- Selector通過SelectionKey來關聯Channel
- 群聊伺服器nio非阻塞式
public class NServer { // 用於檢測所有Channel狀態的Selector private Selector selector = null; static final int PORT = 30000; // 定義實現編碼、解碼的字符集物件 private Charset charset = Charset.forName("UTF-8"); public void init()throws IOException { selector = Selector.open(); // 通過open方法來開啟一個未繫結的ServerSocketChannel例項 ServerSocketChannel server = ServerSocketChannel.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT); // 將該ServerSocketChannel繫結到指定IP地址 server.bind(isa); // 設定ServerSocket以非阻塞方式工作 server.configureBlocking(false); // 將server註冊到指定Selector物件 server.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { // 依次處理selector上的每個已選擇的SelectionKey for (SelectionKey sk : selector.selectedKeys()) { // 從selector上的已選擇Key集中刪除正在處理的SelectionKey selector.selectedKeys().remove(sk); // ① // 如果sk對應的Channel包含客戶端的連線請求 if (sk.isAcceptable()) // ② { // 呼叫accept方法接受連線,產生伺服器端的SocketChannel //此處Server就一個可以直接上邊的獲取到,可以不通過SelectionKey 獲取 SocketChannel sc = server.accept(); // 設定採用非阻塞模式 sc.configureBlocking(false); // 將該SocketChannel也註冊到selector sc.register(selector, SelectionKey.OP_READ); // 將sk對應的Channel設定成準備接受其他請求 sk.interestOps(SelectionKey.OP_ACCEPT); } // 如果sk對應的Channel有資料需要讀取 if (sk.isReadable()) // ③ { // 獲取該SelectionKey對應的Channel,該Channel中有可讀的資料 SocketChannel sc = (SocketChannel)sk.channel(); // 定義準備執行讀取資料的ByteBuffer ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; // 開始讀取資料 try { while(sc.read(buff) > 0) { buff.flip(); content += charset.decode(buff); } // 列印從該sk對應的Channel裡讀取到的資料 System.out.println("讀取的資料:" + content); // 將sk對應的Channel設定成準備下一次讀取 sk.interestOps(SelectionKey.OP_READ); } // 如果捕捉到該sk對應的Channel出現了異常,即表明該Channel // 對應的Client出現了問題,所以從Selector中取消sk的註冊 catch (IOException ex) { // 從Selector中刪除指定的SelectionKey sk.cancel(); if (sk.channel() != null) { sk.channel().close(); } } // 如果content的長度大於0,即聊天資訊不為空 if (content.length() > 0) { // 遍歷該selector裡註冊的所有SelectionKey for (SelectionKey key : selector.keys()) { // 獲取該key對應的Channel Channel targetChannel = key.channel(); // 如果該channel是SocketChannel物件 if (targetChannel instanceof SocketChannel) { // 將讀到的內容寫入該Channel中 SocketChannel dest = (SocketChannel)targetChannel; dest.write(charset.encode(content)); } } } } } } } public static void main(String[] args) throws IOException { new NServer().init(); } }
- 群聊客戶端
public class NClient { // 定義檢測SocketChannel的Selector物件 private Selector selector = null; static final int PORT = 30000; // 定義處理編碼和解碼的字符集 private Charset charset = Charset.forName("UTF-8"); // 客戶端SocketChannel private SocketChannel sc = null; public void init()throws IOException { selector = Selector.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT); // 呼叫open靜態方法建立連線到指定主機的SocketChannel sc = SocketChannel.open(isa); // 設定該sc以非阻塞方式工作 sc.configureBlocking(false); // 將SocketChannel物件註冊到指定Selector sc.register(selector, SelectionKey.OP_READ); // 啟動讀取伺服器端資料的執行緒 new ClientThread().start(); // 建立鍵盤輸入流 Scanner scan = new Scanner(System.in); while (scan.hasNextLine()) { // 讀取鍵盤輸入 String line = scan.nextLine(); // 將鍵盤輸入的內容輸出到SocketChannel中 sc.write(charset.encode(line)); } } // 定義讀取伺服器資料的執行緒 private class ClientThread extends Thread { public void run() { try { while (selector.select() > 0) // ① { // 遍歷每個有可用IO操作Channel對應的SelectionKey for (SelectionKey sk : selector.selectedKeys()) { // 刪除正在處理的SelectionKey selector.selectedKeys().remove(sk); // 如果該SelectionKey對應的Channel中有可讀的資料 if (sk.isReadable()) { // 使用NIO讀取Channel中的資料 SocketChannel sc = (SocketChannel)sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; while(sc.read(buff) > 0) { buff.flip(); content += charset.decode(buff); } // 列印輸出讀取的內容 System.out.println("聊天資訊:" + content); // 為下一次讀取作準備 sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new NClient().init(); } }
- Selector API
- SelectionKey API