Java NIO理解與使用
Netty的使用或許我們看著官網user guide還是很容易入門的。因為java nio使用非常的繁瑣,netty對java nio進行了大量的封裝。對於Netty的理解,我們首先需要了解NIO的原理和使用。所以,我也特別渴望去了解NIO這種通訊模式。
官方的定義是:nio 是non-blocking的簡稱,在jdk1.4 裡提供的新api 。Sun 官方標榜的特性如下: 為所有的原始型別提供(Buffer)快取支援。字符集編碼解碼解決方案。 Channel :一個新的原始I/O 抽象。 支援鎖和記憶體對映檔案的檔案訪問介面。 提供多路(non-bloking) 非阻塞式的高伸縮性網路I/O 。是不是很抽象?
在閱讀《NIO入門》這篇技術文件之後,收穫了很多。包括對Java NIO的理解和使用,所以也特別的感謝作者。
首先,還是來回顧以下從這篇文件中學到的要點。
為什麼要使用 NIO?
NIO 的建立目的是為了讓 Java 程式設計師可以實現高速 I/O 而無需編寫自定義的本機程式碼。NIO 將最耗時的 I/O 操作(即填充和提取緩衝區)轉移回作業系統,因而可以極大地提高速度。
NIO最重要的組成部分
通道 Channels
緩衝區 Buffers
選擇器 Selectors
Buffer 是一個物件, 它包含一些要寫入或者剛讀出的資料。
在 NIO 庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的。在寫入資料時,它是寫入到緩衝區中的。任何時候訪問 NIO 中的資料,您都是將它放到緩衝區中。
緩衝區實質上是一個數組。通常它是一個位元組陣列,但是也可以使用其他種類的陣列。但是一個緩衝區不 僅僅 是一個數組。緩衝區提供了對資料的結構化訪問,而且還可以跟蹤系統的讀/寫程序。
Channel是一個物件,可以通過它讀取和寫入資料
看完下面這個例子,基本上就理解buffer和channel的作用了
package yyf.java.nio.ibm; import java.io.*; import java.nio.*; import java.nio.channels.*; public class CopyFile { static public void main(String args[]) throws Exception { String infile = "c://test/nio_copy.txt"; String outfile = "c://test/result.txt"; FileInputStream fin = new FileInputStream(infile); FileOutputStream fout = new FileOutputStream(outfile); // 獲取讀的通道 FileChannel fcin = fin.getChannel(); // 獲取寫的通道 FileChannel fcout = fout.getChannel(); // 定義緩衝區,並指定大小 ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { // 清空緩衝區 buffer.clear(); //從通道讀取一個數據到緩衝區 int r = fcin.read(buffer); //判斷是否有從通道讀到資料 if (r == -1) { break; } //將buffer指標指向頭部 buffer.flip(); //把緩衝區資料寫入通道 fcout.write(buffer); } } }
緩衝區主要是三個變數
position
limit
capacity
這三個變數一起可以跟蹤緩衝區的狀態和它所包含的資料。我們將在下面的小節中詳細分析每一個變數,還要介紹它們如何適應典型的讀/寫(輸入/輸出)程序。在這個例子中,我們假定要將資料從一個輸入通道拷貝到一個輸出通道。
Position
您可以回想一下,緩衝區實際上就是美化了的陣列。在從通道讀取時,您將所讀取的資料放到底層的陣列中。 position 變數跟蹤已經寫了多少資料。更準確地說,它指定了下一個位元組將放到陣列的哪一個元素中。因此,如果您從通道中讀三個位元組到緩衝區中,那麼緩衝區的 position 將會設定為3,指向陣列中第四個元素。
同樣,在寫入通道時,您是從緩衝區中獲取資料。 position 值跟蹤從緩衝區中獲取了多少資料。更準確地說,它指定下一個位元組來自陣列的哪一個元素。因此如果從緩衝區寫了5個位元組到通道中,那麼緩衝區的 position 將被設定為5,指向陣列的第六個元素。
Limit
limit 變量表明還有多少資料需要取出(在從緩衝區寫入通道時),或者還有多少空間可以放入資料(在從通道讀入緩衝區時)。
position 總是小於或者等於 limit。
Capacity
緩衝區的 capacity 表明可以儲存在緩衝區中的最大資料容量。實際上,它指定了底層陣列的大小 ― 或者至少是指定了准許我們使用的底層陣列的容量。
limit 決不能大於 capacity。
緩衝區作為一個數組,這三個變數就是其中資料的標記,也很好理解。
Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好準備的元件。這樣,一個單獨的執行緒可以管理多個channel,從而管理多個網路連線。
接下來來看看具體的使用把,我建立了一個直接收訊息的伺服器(一邊接收一邊寫資料可能對於新手不好理解)
服務端:
package yyf.java.nio.test;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioReceiver {
@SuppressWarnings("null")
public static void main(String[] args) throws Exception {
ByteBuffer echoBuffer = ByteBuffer.allocate(8);
ServerSocketChannel ssc = ServerSocketChannel.open();
Selector selector = Selector.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(8080);
ss.bind(address);
SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("開始監聽……");
while (true) {
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey sKey = (SelectionKey) it.next();
SocketChannel channel = null;
if (sKey.isAcceptable()) {
ServerSocketChannel sc = (ServerSocketChannel) key.channel();
channel = sc.accept();// 接受連線請求
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
it.remove();
} else if (sKey.isReadable()) {
channel = (SocketChannel) sKey.channel();
while (true) {
echoBuffer.clear();
int r = channel.read(echoBuffer);
if (r <= 0) {
channel.close();
System.out.println("接收完畢,斷開連線");
break;
}
System.out.println("##" + r + " " + new String(echoBuffer.array(), 0, echoBuffer.position()));
echoBuffer.flip();
}
it.remove();
} else {
channel.close();
}
}
}
}
}
客戶端(NIO):
package yyf.java.nio.test;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioTest {
public static void main(String[] args) throws Exception {
ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
SocketChannel channel = null;
Selector selector = null;
channel = SocketChannel.open();
channel.configureBlocking(false);
// 請求連線
channel.connect(new InetSocketAddress("localhost", 8080));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
if (key.isConnectable()) {
if (channel.isConnectionPending()) {
if (channel.finishConnect()) {
// 只有當連線成功後才能註冊OP_READ事件
key.interestOps(SelectionKey.OP_READ);
echoBuffer.put("123456789abcdefghijklmnopq".getBytes());
echoBuffer.flip();
System.out.println("##" + new String(echoBuffer.array()));
channel.write(echoBuffer);
System.out.println("寫入完畢");
} else {
key.cancel();
}
}
}
}
}
}
執行結果:
開始監聽……
##8 12345678
##8 9abcdefg
##8 hijklmno
##2 pq
接收完畢,斷開連線
當然,BIO的客戶端也可以,開啟10個BIO客戶端執行緒
package yyf.java.nio.test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import yyf.java.test.Main;
public class BioClientTest {
public static void main(String[] args) throws Exception {
BioClient n = new BioClient();
for (int i = 0; i < 10; i++) {
Thread t1 = new Thread(n);
t1.start();
}
}
}
class BioClient implements Runnable {
@Override
public void run() {
try {
Socket socket = new Socket("127.0.0.1", 8080);
OutputStream os = socket.getOutputStream();
InputStream is = socket.getInputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
String str = Thread.currentThread().getName() + "...........sadsadasJava";
os.write(str.getBytes());
StringBuffer sb = new StringBuffer();
byte[] b = new byte[1024];
int len;
while ((len = is.read(b)) != -1) {
bos.write(b, 0, len);
}
is.close();
os.close();
socket.close();
System.out.println(Thread.currentThread().getName() + " 寫入完畢 " + new String(bos.toByteArray()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
執行結果:
##8 Thread-4
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-3
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-9
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-7
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-0
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-5
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-2
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-8
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-1
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-6
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
當然,這只是一個測試,對於一個伺服器,是有讀取,也有寫出的,這是文件給的一個服務端例子
package yyf.java.nio.ibm;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class MultiPortEcho {
private int ports[];
private ByteBuffer echoBuffer = ByteBuffer.allocate(5);
public MultiPortEcho(int ports[]) throws IOException {
this.ports = ports;
go();
}
private void go() throws IOException {
Selector selector = Selector.open();
for (int i = 0; i < ports.length; ++i) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
ss.bind(address);
SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Going to listen on " + ports[i]);
}
while (true) {
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
it.remove();
System.out.println("Got connection from " + sc);
} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
SocketChannel sc = (SocketChannel) key.channel();
int bytesEchoed = 0;
while (true) {
echoBuffer.clear();
int r = sc.read(echoBuffer);
if (r <= 0) {
sc.close();
break;
}
echoBuffer.flip();
sc.write(echoBuffer);
bytesEchoed += r;
}
System.out.println("Echoed " + bytesEchoed + " from " + sc);
it.remove();
}
}
// System.out.println( "going to clear" );
// selectedKeys.clear();
// System.out.println( "cleared" );
}
}
static public void main(String args[]) throws Exception {
int ports[] = new int[] { 8080 };
for (int i = 0; i < args.length; ++i) {
ports[i] = Integer.parseInt(args[i]);
}
new MultiPortEcho(ports);
}
}
現在,我們就寫個客戶端去跟伺服器通訊,把發過去的返回來:
package yyf.java.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import javax.swing.ButtonGroup;
public class NioClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8080);
socketChannel.connect(socketAddress);
String str = "你好a";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(buffer);
socketChannel.socket().shutdownOutput();
buffer.clear();
byte[] bytes;
int count = 0;
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip();
bytes = new byte[count];
buffer.get(bytes);
System.out.println(new String(buffer.array()));
buffer.clear();
}
socketChannel.socket().shutdownInput();
socketChannel.socket().close();
socketChannel.close();
}
}
執行結果
server:
Going to listen on 8080
Got connection from java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:63584]
Echoed 7 from java.nio.channels.SocketChannel[closed]
client:
你好a
對於NIO的入門就先到這裡。