Java NIO | 流程與示例程式碼
部落格引用處(以下內容在原有部落格基礎上進行補充或更改,謝謝這些大牛的部落格指導):
java NIO示例以及流程詳解
Java NIO深入理解ServerSocketChannel
併發程式設計網
Java NIO之Selector(選擇器)
NIO執行流程:
第一步:啟動server伺服器,初始化多路複用器selector、ServerSocketChannel通道、設定通道的模式為非阻塞、註冊channel到selector上,並監聽accept請求;
第二步:啟動server伺服器,迴圈selectionKeys,當有channel準備好時就處理,否則一直迴圈或者用間隔輪詢的方式,比如阻塞1S後再獲取selectionKeys;
第三步:啟動client端,初始化多路複用器selector、SocketChannel通道,設定通道的模式為非阻塞(這是連線準備就緒,還未連線就緒/連線成功);
第四步:client首先嚐試連線server,此時socketChannel.connect(new InetSocketAddress(this.host, this.port)返回false,表示server還沒有返回資訊,server收到連線請求後,監聽到client的接入請求,會初始化一個client通道物件在服務端、並將新接入的client註冊到多路複用器Selector上,並應答client;再回到client端,由於client沒有及時收到server端的應答(因為是非阻塞式的),所以client會設定監聽一個connect請求,socketChannel.register(selector, SelectionKey.OP_CONNECT),當server返回應答資訊時,client會收到一個connect請求,key.isConnectable(),如果此時sc.finishConnect()連線完成,client會監聽一個read請求,並像server傳送資料doWrite(sc),然後server會收到一個read請求,key.isReadable()處理完後返回給client,client也會收到一個讀請求,收到server的返回資料,此時,整個互動過程結束;
服務端:
package nio; import org.springframework.util.StringUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class ServerSocketChannels implements Runnable { private ServerSocketChannel serverSocketChannel; private Selector selector; private volatile boolean stop; public ServerSocketChannels(int port){ try { //建立多路複用器selector,工廠方法 selector = Selector.open(); //建立ServerSocketChannel,工廠方法 serverSocketChannel = ServerSocketChannel.open(); //繫結ip和埠號,預設的IP=127.0.0.1,對連線的請求最大佇列長度設定為backlog=1024,如果佇列滿時收到連線請求,則拒絕連線 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); //設定非阻塞方式 serverSocketChannel.configureBlocking(false); //註冊serverSocketChannel到selector多路服用器上面,監聽accrpt請求 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("the time is start port = " + port); } catch (IOException e) { e.printStackTrace(); //用於非正常退出JVM虛擬機器,停掉所有運作 0代表正常退出,非零代表非正常退出 System.exit(1); } } public void stop(){ this.stop = true; } @Override public void run() { //如果server沒有停止 while(!stop){ try { //selector.select()會一直阻塞到有一個通道在你註冊的事件上就緒了 //selector.select(1000)會阻塞到1s後然後接著執行,相當於1s輪詢檢查 selector.select(1000); //找到所有準備接續的key Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { //處理準備就緒的key handle(key); }catch (Exception e){ if(key != null){ //請求取消此鍵的通道到其選擇器的註冊 key.cancel(); //關閉這個通道 if(key.channel() != null){ key.channel().close(); } } } } } catch (Throwable e) { e.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } public void handle(SelectionKey key) throws IOException { //如果key是有效的 if(key.isValid()){ //監聽到有新客戶端的接入請求 //完成TCP的三次握手,建立物理鏈路層 if(key.isAcceptable()){ ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = (SocketChannel) ssc.accept(); //設定客戶端鏈路為非阻塞模式 sc.configureBlocking(false); //將新接入的客戶端註冊到多路複用器Selector上 sc.register(selector, SelectionKey.OP_READ); } //監聽到客戶端的讀請求 if(key.isReadable()){ //獲得通道物件 SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //從channel讀資料到緩衝區 int readBytes = sc.read(readBuffer); if (readBytes > 0){ //Flips this buffer. The limit is set to the current position and then // the position is set to zero,就是表示要從起始位置開始讀取資料 readBuffer.flip(); //eturns the number of elements between the current position and the limit. // 要讀取的位元組長度 byte[] bytes = new byte[readBuffer.remaining()]; //將緩衝區的資料讀到bytes陣列 readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("the time server receive order: " + body); String currenttime = "query time order".equals(body) ? new Date(System.currentTimeMillis()).toString(): "bad order"; doWrite(sc, currenttime); }else if(readBytes < 0){ key.channel(); sc.close(); } } } } public static void doWrite(SocketChannel channel, String response) throws IOException { if(!StringUtils.isEmpty(response)){ byte [] bytes = response.getBytes(); //分配一個bytes的length長度的ByteBuffer ByteBuffer write = ByteBuffer.allocate(bytes.length); //將返回資料寫入緩衝區 write.put(bytes); write.flip(); //將緩衝資料寫入渠道,返回給客戶端 channel.write(write); } } }
服務端啟動程式:
package nio;
/**
* 服務端啟動程式
*/
public class ServerMain {
public static void main(String[] args) {
int port = 8010;
ServerSocketChannels server = new ServerSocketChannels(port);
new Thread(server,"timeserver-001").start();
}
}
客戶端程式:
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandler implements Runnable {
//伺服器端的ip
private String host;
//伺服器端的埠號
private int port;
//多路服用選擇器
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(String host, int port){
this.host = host == null ? "127.0.0.1": host;
this.port = port;
try {
//初始化一個Selector,工廠方法
selector = Selector.open();
//初始化一個SocketChannel,工廠方法
socketChannel = SocketChannel.open();
//設定非阻塞模式
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
/**
* 首先嚐試連線服務端
* @throws IOException
*/
public void doConnect() throws IOException {
//如果連線成功,像多路複用器selector監聽讀請求
if(socketChannel.connect(new InetSocketAddress(this.host, this.port))){
socketChannel.register(selector, SelectionKey.OP_READ);
//執行寫操作,像伺服器端傳送資料
doWrite(socketChannel);
}else {
//監聽連線請求
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
public static void doWrite(SocketChannel sc) throws IOException {
//構造請求訊息體
byte [] bytes = "query time order".getBytes();
//構造ByteBuffer
ByteBuffer write = ByteBuffer.allocate(bytes.length);
//將訊息體寫入傳送緩衝區
write.put(bytes);
write.flip();
//呼叫channel的傳送方法非同步傳送
sc.write(write);
//通過hasRemaining方法對傳送結果進行判斷,如果訊息全部發送成功,則返回true
if(!write.hasRemaining()){
System.out.println("send order 2 server successd");
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop){
try {
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> its =keys.iterator();
SelectionKey key = null;
while (its.hasNext()){
key = its.next();
its.remove();
try {
handle(key);
}catch (Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
public void handle(SelectionKey key) throws IOException {
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
//如果連線成功,監聽讀請求
if(sc.finishConnect()){
sc.register(this.selector, SelectionKey.OP_READ);
//像服務端傳送資料
doWrite(sc);
}else{
System.exit(1);
}
}
//監聽到讀請求,從伺服器端接受資料
if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(byteBuffer);
if(readBytes > 0){
byteBuffer.flip();
byte [] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("now body is "+ body);
stop = true;
}else if(readBytes < 0){
key.cancel();
sc.close();
}
}
}
}
//釋放所有與該多路複用器selector關聯的資源
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端啟動程式:
package nio;
/**
* 客戶端啟動程式
*/
public class ClientMain {
public static void main(String[] args) {
int port = 8010;
TimeClientHandler client = new TimeClientHandler("",port);
new Thread(client,"client-001").start();
}
}
解釋說明:
- java.nio.Buffer flip()方法:將快取位元組陣列的指標設定為陣列的開始序列即陣列下標0。這樣就可以從buffer開頭,對該buffer進行遍歷(讀取)了。
也就是說呼叫flip之後,讀寫指標指到快取頭部,並且設定了最多隻能讀出之前寫入的資料長度(而不是整個快取的容量大小)。詳細:https://www.cnblogs.com/woshijpf/articles/3723364.html - backlog引數:
TCP建立連線是要進行三次握手,但是否完成三次握手後,伺服器就處理(accept)呢?
backlog其實是一個連線佇列,在Linux核心2.2之前,backlog大小包括半連線狀態和全連線狀態兩種佇列大小。
半連線狀態為:伺服器處於Listen狀態時收到客戶端SYN報文時放入半連線佇列中,即SYN queue(伺服器埠狀態為:SYN_RCVD)。
全連線狀態為:TCP的連線狀態從伺服器(SYN+ACK)響應客戶端後,到客戶端的ACK報文到達伺服器之前,則一直保留在半連線狀態中;當伺服器接收到客戶端的ACK報文後,該條目將從半連線佇列搬到全連線佇列尾部,即 accept queue (伺服器埠狀態為:ESTABLISHED)。
在Linux核心2.2之後,分離為兩個backlog來分別限制半連線(SYN_RCVD狀態)佇列大小和全連線(ESTABLISHED狀態)佇列大小。 詳細:https://www.cnblogs.com/Orgliny/p/5780796.html