Java NIO 非阻塞socket通訊案例
阿新 • • 發佈:2019-01-07
NIO的特性:它以塊為基本單位處理資料,所有的資料都要通過緩衝區(Buffer)來進行傳輸。它有一個用來作為原始I/O操作的抽象通道(Channel)並提供了Selector的非同步網路介面。且支援將檔案對映到記憶體,以大幅提高I/O效率。
緩衝區中有3個重要的引數:
position(位置):即緩衝區的位置,指緩衝區指標到哪個位置了。
capactiy(容量):緩衝區的總上限,如ByteBuffer.allocate(2048),則認為產生的這個bytebuffer的容量為2048
limit(上限):實際存入緩衝區資料所佔的容量,也稱為實際上限。
常用方法:
filp():將position置為0,將limit置為原position位置。經過此操作的buffer資料才能被正常讀取(讀寫轉換)。
clear():將position置為0,limit設定為容量大小,並清除mark標誌位。效果是清空buffer
rewind():將position置為0,並清除標誌位
檔案對映到記憶體的方法:
RandomAccessFile raf = new RandomAccessFile("C:\\test.dat", "rw");
FileChannel fc = raf.getChannel();
//將檔案對映到記憶體中
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); while(mbb.hasRemaining()){
System.out.print((char)mbb.get()); }
mbb.put(0,(byte)98); //修改檔案
raf.close();
正文:
Nio非阻塞網路通訊的工作原理為,由一個執行緒監聽多個客戶端,由selector選擇已完成資料準備工作的客戶端,以對請求做出處理。由一個執行緒來監聽多個客戶端的方式能夠省下來大量的執行緒資源。
以下是模擬一次計算請求的程式碼:
服務端:
public class NioServer {
private Selector selector;
private ByteBuffer recive = ByteBuffer.allocate(2048);
private ByteBuffer send = ByteBuffer.allocate(2048);
public NioServer(int port) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
// 設定非阻塞
ssc.configureBlocking(false);
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress(port));
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 啟動伺服器
*
* @throws IOException
*/
public void startup() throws IOException {
System.out.println("=======Nio Server is started!=======");
for (;;) {
int size = selector.select();
if (size > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = keys.iterator(); iterator.hasNext();) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
processor(key);
}
}
}
}
/**
* 停止伺服器
*/
public void shutdown() {
System.out.println("=======Nio Server is stoped!=======");
System.exit(0);
}
/**
* 訊息處理
*
* @throws IOException
*/
public void processor(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel accept = server.accept();
accept.configureBlocking(false);
// 改變監聽狀態
accept.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
recive.clear(); // 在讀取資料之前要保證緩衝區是空的
int size = client.read(recive); //讀取完成後返回-1
if(size == -1) {
//如果沒讀出來東西則不做任何操作
} else {
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
int value = arithmetic(new String(recive.array())); //計算結果[業務部分]
byte[] result = String.valueOf(value).getBytes();
send.clear();
send.put(result);
send.flip();
// 返回結果
client.write(send);
client.register(selector, SelectionKey.OP_READ); //如有大量資料要處理,設定此項 迴圈處理。
}
}
/**
* 計算結果
*
* @return
*/
private int arithmetic(String data) {
//假設這裡面有一堆操作
System.out.println("客戶端資料:"+data);//結果為空,轉換緩衝區沒報錯
return 1;
}
public static void main(String[] args) {
try {
NioServer nioServer = new NioServer(9900);
nioServer.startup();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端:
public class NioClient {
private int port;
private ByteBuffer recive = ByteBuffer.allocate(2048);
private ByteBuffer send = ByteBuffer.allocate(2048);
public NioClient(int port) throws IOException {
this.port = port;
}
/**
* 傳送資料並等待運算結果
*
* @param obj
* @return
*/
public Object send(Object obj) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
Selector selector = Selector.open();
sc.connect(new InetSocketAddress(port));
sc.register(selector, SelectionKey.OP_CONNECT);
Object result = null;
boolean isDone = false;
for (;;) {
if (selector.select() == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = keys.iterator(); iterator.hasNext();) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.isConnectionPending()) {
sc.finishConnect();
sc.register(selector, SelectionKey.OP_WRITE);
}
} else if (key.isReadable()) {
recive.clear();
sc.read(recive);
result = new String(recive.array());
System.out.println("done!");
sc.register(selector, SelectionKey.OP_WRITE);
isDone = true;
sc.close();
} else if (key.isWritable()) {
byte[] datas = ((String)obj).getBytes();
send.clear();
send.put(datas);
//裝載完資料要恢復緩衝區標誌位,不然服務端會在讀取時出現問題
send.flip();
sc.write(send);
sc.register(selector, SelectionKey.OP_READ);
}
} //iterator
if(isDone == true) {
break;
}
} //for true
return result;
}
public static void main(String[] args) {
try {
NioClient client = new NioClient(9900); //連線四則運算伺服器
Object result = client.send("1+1");
System.out.println("計算結果為:"+ String.valueOf(result));
} catch (IOException e) {
e.printStackTrace();
}
}
}
如有錯誤或不合理的地方,煩請大家留言,本人會盡快改正。