Socket程式設計(二)(NIO)
與Socket和ServerSocket類相對應,NIO也提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。這兩種新增的通道都支援阻塞和非阻塞兩種模式。
1.緩衝區Buffer
Buffer是一個物件,它包含一些要寫入或讀出的資料。在NIO類庫加入Buffer物件,體現了新庫與原I/O的一個重要區別。在面向流的I/O中,可以將資料直接寫入或者將資料直接讀到Stream物件中。
在NIO庫中所有資料都用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中。在寫入資料時,寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。
緩衝區實質上是一個數組。通常它是一個位元組陣列(ByteBuffer),也可以使用其他種類的陣列。但是一個緩衝區不僅僅是一個數組,緩衝區提供了對資料的結構化訪問以及維護讀寫位置等資訊。
最常用的緩衝區是ByteBuffer,一個ByteBuffer提供了一組功能用於操作byte陣列。除了ByteBuffer還有其他緩衝區,除了Boolean型別每一種Java基本型別都對應一種緩衝區。
2.通道Channel
Channel是一個通道,網路資料通過Channel讀取和寫入。通道與流不同之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或OutputStream的子類),而通道可以用於讀、寫或者兩者同時進行。因為Channel是雙全工的,所以它可以比流更好的對映底層作業系統的API。
從類圖中可以看出Channel可以分為兩大類:用於網路讀寫的SelectableChannel和用於檔案操作的FileChannel。
3.多路複用器Selector
多路複用器提供選擇已經就緒的任務的能力。簡單來說Selector會不斷地輪詢註冊其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。
一個多路複用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,所以它並沒有最大連線控制代碼1024/2048的限制。這也就意味著只需要一個執行緒負責Selector的輪詢就可以接入成千上萬的客戶端。
服務端程式碼:
//nio 時間伺服器
public class TimeServer {
public static void main(String[] args) {
int port = 9999;
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port));
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("時間伺服器在埠啟動:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
System.out.println("選擇器不為空");
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 判斷是否是個有效的控制代碼
if (key.isAcceptable()) {
// 接受請求
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 新增新連線到 selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// 讀取資料
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 為ByteBuffer分配空間大小(位於 jvm)
//ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 為ByteBuffer分配空間大小(基於作業系統)
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
// 將快取位元組陣列的指標設定為陣列的開始序列即陣列下標0
byte[] bytes = new byte[readBuffer.remaining()];
// 返回剩餘的可用長度,此長度為實際讀取的資料長度,最大自然是底層陣列的長度
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("時間伺服器接收資料 : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
? new Date(System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 對端鏈路關閉
key.cancel();
sc.close();
} else {
// 讀取到 0 位元組,忽略
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
// 將position 設定為0
channel.write(writeBuffer);
writeBuffer.clear();
}
}
}
程式碼分析:
1.首先會在構造方法中進行資源初始化,建立多路複用器Selector、ServerSocketChannel,對Channel和TCP引數進行配置。如,將ServerSocketChannel設定為非同步非阻塞模式,它的backlog設為1024,系統資源初始化成功後,將ServerSocketChannel註冊到Selector,監聽SelectionKey.OP_ACCEPT操作位。如果資源初始化失敗,則退出。
2.在run方法的while迴圈體中迴圈遍歷selector,設定休眠時間為1s。無論是否有讀寫事件發生,selector每隔1s都被喚醒一次。selector也有一個無參的select方法:當有處於就緒狀態的Channel時,selector將返回該Channel的SelectionKey集合。通過對就緒狀態的Channel集合進行迭代,可以進行網路的非同步讀寫操作。
3.當有客戶端接入時,根據SelectionKey的操作位進行判斷獲知網路事件的型別,通過ServerSocketChannel的accept接受客戶端的連線請求並建立SocketChannel例項。完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。然後將新建立的SocketChannel設定成非同步非阻塞。
4.然後就是讀取客戶端請求,首先建立一個ByteBuffer,然後呼叫SocketChannel的read方法讀取請求碼流。因為此時已經把SocketChannel設定為非同步非阻塞模式,因此它的read是非阻塞的。所以要將返回值進行判斷:
1.返回值大於0:讀取到位元組,對位元組進行編解碼。
2.返回值等於0:沒有讀取到位元組,忽略;
3.返回值小於0:鏈路已經關閉,需要關閉SocketChannel釋放資源。
當讀取到碼流後,進行解碼。首先對readBuffer進行flip操作,它的作用是將緩衝區當前的limit設定為position,position設定為0,用於後續對緩衝區進行讀取操作。然後根據緩衝區可讀位元組個數建立位元組陣列,呼叫ByteBuffer的get操作將緩衝區可讀的位元組陣列複製到新建立的位元組陣列中,最後呼叫字串構造將其打印出來。如果請求的字串是“QUERY TIME ORDER”則把伺服器的當前時間編碼後返回給客戶端。
5.在讀取完資料後會呼叫doWrite方法,首先他會將字串編碼成位元組陣列,根據位元組陣列的容量建立ByteBuffer,呼叫ByteBuffer的put操作將位元組陣列複製到緩衝區中,然後對緩衝區進行flip操作,最後呼叫SocketChannel的write方法將緩衝區中的位元組陣列傳送出去。由於SocketChannel是非同步非阻塞的,他並不保證一次能夠把需要傳送的位元組陣列傳送完,此時會出現“寫半包”問題。我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的ByteBuffer傳送完畢,然後通過ByteBuffer的hasRemain()方法判斷訊息是否傳送完成。
客戶端程式碼:,
public class TimeClient {
public static void main(String[] args) {
int port = 9999;
new Thread(new TimeClientHandler("127.0.0.1", port)).start();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandler implements Runnable {
private int port;
private String host;
private Selector selector;
private SocketChannel channel;
private volatile boolean stop;
public TimeClientHandler(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
// selector每一秒被喚醒一次
selector.select(1000);
// 還回就緒狀態的chanel的 selectedKeys
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
selector = null;
}
}
public void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else {
System.exit(1);// 連線失敗,程序退出
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("當前時間 : " + body);
this.stop = true;
} else if (readBytes < 0) { // 對端鏈路關閉
key.cancel();
sc.close();
} else
; // 讀到0位元組,忽略
}
}
}
private void doConnect() throws IOException {
if (channel.connect(new InetSocketAddress(host, port))) {
channel.register(selector, SelectionKey.OP_READ);
doWrite(channel);
} else {
channel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel schannel) throws IOException {
byte[] bytes = "QUERY TIME ORDER".getBytes();
ByteBuffer buff = ByteBuffer.allocate(bytes.length);
buff.put(bytes);
buff.flip();
schannel.write(buff); // 判斷是否傳送完畢
if (!buff.hasRemaining()) {
System.out.println("傳送成功!");
}
}
}
1.首先初始化NIO的多路複用器和SocketChannel物件。需要注意的是建立SocketChannel之後,需要將其設定為非同步非阻塞模式。
2.doConnect()函式用於傳送連線請求,首先對SocketChannel的connect()操作進行判斷,如果成功則將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_READ;如果沒有沒有成功則說明服務端沒有返回TCP握手應答訊息,但所以此時需要將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_CONNECT,當服務端返回TCP syn-ack訊息後,Selector就能輪詢到這個SocketChannel處於連線就緒狀態。
3.在執行完doConnect()後,通過迴圈不斷輪詢多路複用器Selector。當有就緒的Channel時,執行handleInput()方法。
4.首先對SelectionKey進行判斷,判斷其屬於什麼狀態,如果處於連線則說明服務端已返回ACK應答訊息。這時我們需要對連線結果進行判斷,呼叫SocketChannel的finishConnect()方法。如果返回值為true,說明客戶端連線成功;如果返回false或者丟擲IOException說明連線失敗。在這裡返回值為true,說明連線成功。將SocketChannel註冊到多路複用器上,註冊SelectionKey.OP_READ操作位,監聽網路讀操作,然後將請求訊息給服務端。
此時會呼叫doWrite()方法,這裡構造訊息體然後將其編碼寫入到傳送緩衝區中,最後呼叫SocketChannel的write方法進行傳送。
5.如果客戶端接收到了服務端的應答訊息,則SocketChannel是可讀的,這裡我分配1M進行讀取應答訊息,呼叫SocketChannel的read()方法進行非同步讀取操作。如果讀取到了訊息則對訊息進行解碼最後輸出,然後將stop設為true,執行緒退出迴圈。
6.執行緒退出迴圈後,需要對資源進行釋放。
NIO優點總結:
1.客戶端發起的連線操作是非同步的,可以通過在多路複用器註冊OP_CONNECT等待後續結果,不需要像之前的客戶端那樣被同步阻塞。
2.SocketChannel的讀寫操作都是非同步的,如果沒有可讀寫的資料它不會同步等待,直接返回,這樣I/O通訊執行緒就可以處理其他的鏈路,不需要同步等待這個鏈路可用。
3.執行緒模型的優化:由於JDK的Selector在Linux等主流作業系統上通過epoll實現,它沒有連線控制代碼數的限制,這意味著一個Selector執行緒可以同時處理成千上萬個客戶端連線,而且效能不會隨著客戶端的增加而線性下降。因此它非常適合做高效能、高負載的網路伺服器。