nio 客戶端與服務端通訊Demo
本篇博文主要是從網上收集和整理眾多網友關於NIO的理解所寫的博文,非作者原創(除最後的服務端與客戶端通訊的Demo),在此宣告。
1. NIO入門概念:
主要參考文獻:Java nio 使用及原理分析
Java NIO 使用及原理分析(一):
主要對緩衝區Buffer的概念和通道Channel的概念進行了簡單的介紹;
緩衝區 實際上是一個數組,在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的; 在寫入資料時,它也是寫入到緩衝區中的;任何時候訪問 NIO 中的資料,都是將它放到緩衝區中。而在面向流I/O系統中,所有資料都是直接寫入或者直接將資料讀取到Stream物件中。
通道是一個物件,通過它可以讀取和寫入資料,當然了所有資料都通過Buffer物件來處理。我們永遠不會將位元組直接寫入通道中,相反是將資料寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將資料從通道讀入緩衝區,再從緩衝區獲取這個位元組。
使用NIO讀取資料 (將chanel對應的終端資料讀入到buffer中, 核心方法inChannel.read(buffer))
1. 從FileInputStream獲取Channel
2. 建立Buffer
3. 將資料從Channel讀取(read)到Buffer中
使用NIO寫入資料 (將buffer中的資料寫入到channel對應的終端,核心方法,channel.write(buffer))
1. 從FileInputStream獲取Channel
2. 建立Buffer
3. 將資料從Channel寫入(write)到Buffer中
import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class ChannelReadToBuffer { static public void main( String args[] ) throws Exception { Charset charset = Charset.forName("GBK"); CharsetDecoder decoder = charset.newDecoder(); FileInputStream fin = new FileInputStream("e:\\nioTest\\src.txt"); // 獲取通道 FileChannel fc = fin.getChannel(); // 建立緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(512); CharBuffer charBuffer = CharBuffer.allocate(512); // 讀取資料到緩衝區 fc.read(byteBuffer); byteBuffer.flip(); decoder.decode(byteBuffer, charBuffer, false); charBuffer.flip(); System.out.println(charBuffer); fc.close(); fin.close(); } }
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
public class Program {
static private final byte message[] = { 83, 111, 109, 101, 32,
98, 121, 116, 101, 115, 46 };
static public void main( String args[] ) throws Exception {
FileOutputStream fout = new FileOutputStream( "c:\\test.txt" );
FileChannel fc = fout.getChannel();
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
for (int i=0; i<message.length; ++i) {
buffer.put( message[i] );
}
buffer.flip();
fc.write( buffer );
fout.close();
}
}
Java NIO 使用及原理分析(二)和 Java NIO 使用及原理分析(三):
主要介紹了buffer的 position limit capacity flip slice等概念;
緩衝區的分配:allocate()
緩衝區分片:slice()
只讀緩衝區:asReadOnlyBuffer()
記憶體對映檔案I/O:MappedByteBuffer
Java NIO 使用及原理分析(四):
NIO中非阻塞I/O採用了基於Reactor模式的工作方式,I/O呼叫不會被阻塞,相反是註冊感興趣的特定I/O事件,如可讀資料到達,新的套接字連線等等,在發生特定事件時,系統再通知我們。NIO中實現非阻塞I/O的核心物件就是Selector,Selector就是註冊各種I/O事件地 方,而且當那些事件發生時,就是這個物件告訴我們所發生的事件,如下圖所示:
當有讀或寫等任何註冊的事件發生時,可以從Selector中獲得相應的SelectionKey,同時從 SelectionKey中可以找到發生的事件和該事件所發生的具體的SelectableChannel,以獲得客戶端傳送過來的資料
使用NIO中非阻塞I/O編寫伺服器處理程式,大體上可以分為下面三個步驟:
1. 向Selector物件註冊感興趣的事件
2. 從Selector中獲取感興趣的事件
3. 根據不同的事件進行相應的處理
具體博主縮寫的示例也很清楚,只是寫的有點簡單,並且沒有說清客服端如何處理詳細的請求;後面我們給出一個詳細的示例進行展示;
2. 使用selector進行客戶端與服務端的通訊
參考文獻:
(2) Java NIO學習筆記(三) 使用Selector客戶端與伺服器的通訊
3. 客戶端與服務端selector通訊demo
3.1 服務端程式碼:
package com.qian.nio.scoket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ServerSocketDemo {
private static final String IP = "10.86.38.57";
private static final int PORT = 8001;
private static final int BUFFER_SIZE = 128;
//統計客戶端的個數
private static int clientCount = 0;
private static ServerSocketChannel serverChannel = null;
public static void server() throws IOException{
//1. 獲取服務端通道並繫結IP和埠號
serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(IP, PORT));
//2. 將服務端通道設定成非阻塞模式
serverChannel.configureBlocking(false);
//3. 開啟一個選擇器
Selector selector = Selector.open();
//4. 向選擇器上註冊監聽事件(接收事件)// 註冊該事件後,當事件到達的時候,selector.select()會返回, 否則會一直阻塞
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 採用輪訓的方式監聽selector上是否有需要處理的事件,如果有,進行處理
while(true){
// 輪訓selector
selector.select();
//5. 獲取選擇器上所有監聽事件值
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeySet.iterator();
while(it.hasNext()){
//6. 獲取selectionKey值
SelectionKey selectionKey = it.next();
try{//解決客戶端關閉或 服務端讀取不到獲取無法寫入客戶端報IO異常;
//7. 根據key值判斷事件
if(selectionKey.isValid() && selectionKey.isAcceptable()){//測試此鍵的通道是否已準備好接受新的套接字連線。
//8. 接入事件處理
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
//響應客戶端;(沒有必要的)導致客戶端
clientCount++;
replyClientForAcceptable(socketChannel);
} else if(selectionKey.isValid() && selectionKey.isReadable()){
// 處理客戶端傳送來的訊息
dealClientMsg(selectionKey);
// 響應客戶端
replyClientMsg(selectionKey);
} else if(selectionKey.isValid() && selectionKey.isWritable()){
}
//10. 手動刪除selectionKey
it.remove();
}catch(IOException e){
if(selectionKey!=null){
selectionKey.cancel();
}
SocketChannel sc = (SocketChannel) selectionKey.channel();
if(sc!=null){
sc.socket().close();
sc.close();
}
continue;//繼續監聽其他客戶端發來的訊息;
}
}
}
}
/**
* @Description 響應客戶端的接入事件;
* @param socketChannel
* @throws IOException
*/
private static void replyClientForAcceptable(SocketChannel socketChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.put(("hello client "+clientCount+"!\r\n").getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
/**
* @Description 處理客戶端訊息;
* @param selectionKey
* @throws IOException
*/
private static void dealClientMsg(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int len = 0;
//將客戶端socket傳送來的資料讀入到buffer中;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
byte[] bytes = new byte[BUFFER_SIZE];
buffer.get(bytes, 0, len);
String msg = new String(bytes,0,len);
//將客戶端發來的訊息列印到控制檯
System.out.println(msg);
}
}
/**
* @Description 回覆客戶端訊息;
* @param selectionKey
* @throws IOException
*/
private static void replyClientMsg(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.put(("get your message of client !\r\n").getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
public static void main(String[] args) {
try {
server();
} catch (IOException e) {
if(serverChannel != null){
try {
serverChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
客戶端:
package com.qian.nio.scoket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Scanner;
public class ClientSocketDemo {
private static final String IP = "10.86.38.57";
private static final int PORT = 8001;
private static final int BUFFER_SIZE = 128;
public static void send() throws InterruptedException {
SocketChannel socketChannel = null;
try{
//1. 獲取socketChannel
socketChannel = SocketChannel.open();
//2. 建立連線
socketChannel.connect(new InetSocketAddress(IP, PORT));
//3. 設定通道為非阻塞
socketChannel.configureBlocking(false);
// 傳送握手訊息
sendHandMsg(socketChannel);
// 接收握手訊息,實際上是接收的服務端Acceptable中的響應
Thread.sleep(10);
reciveServerMsg(socketChannel);
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
@SuppressWarnings("resource")
Scanner scanner = new Scanner(System.in);//鍵盤輸入
String msg;
System.out.println("Please input your message to server : ");
while (scanner.hasNext()) {
msg = scanner.nextLine();
buffer.put((new Date() + ": " + msg).getBytes());
buffer.flip();
//4. 向通道寫資料(向服務端傳送資料)
socketChannel.write(buffer);
buffer.clear();
// 接收服務端發來的資料
Thread.sleep(10);
reciveServerMsg(socketChannel);
}
}catch(IOException e){
System.out.println("=======服務端已關閉連線 訊息傳送失敗=====");
}finally{
if(socketChannel != null){
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void sendHandMsg(SocketChannel socketChannel) throws IOException {
//響應服務端的握手在channel.write(buffer)以後服務端才會呼叫Acceptable中響應訊息函式
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.put((new Date() + ": hello server!\r\n").getBytes());
buffer.flip();
//4. 向通道寫資料
socketChannel.write(buffer);
buffer.clear();
}
/**
* @ Description接收服務端傳送回來的訊息;
* @param socketChannel
* @throws IOException
*/
private static void reciveServerMsg(SocketChannel socketChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while ((socketChannel.read(buffer)) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
send();
}
}
演示結果:
客戶端1:
hello client 1!
get your message of client !
Please input your message to server :
client1 a
get your message of client !
clent1 b
get your message of client !
client1 stop
get your message of client !
client1 end
get your message of client !
客戶端2:
服務端:
幾點說明和註解:
第一:
一個Channel僅僅可以註冊到一個Selector一次,如果將Channel註冊到Selector多次,那麼其實相當於在更新SelectionKey 的 insterest set。
channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的 channel 註冊到同一個 Selector 兩次了, 那麼第二次的註冊其實就是相當於更新這個 Channel 的 interest set 為 SelectionKey.OP_READ | SelectionKey.OP_WRITE.
第二:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
需要說明的是,select() 方法僅僅是簡單地將就緒的IO操作放到了set<selectionKey>集合中,並且用完後自己不會刪除,因此我們每次在迭代中都要手動的呼叫 keyIterator.remove() 方法將這個key刪除。例如:我們在收到OP_ACCEPT 通知, 然後我們進行相關處理, 但是並沒有將這個 Key 從 SelectedKeys 中刪除, 那麼下一次 select() 返回時 我們還可以在 SelectedKeys 中獲取到 OP_ACCEPT 的 key.
第三:
關於在不同監聽事件中,key.channel()返回物件的區分;
if (key.isAcceptable()) {
// 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
//注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
// 在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
// 注意, 這裡我們如果沒有設定 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話,
// 那麼 select 方法會一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}