Netty權威指南 第2版學習筆記2——NIO入門
傳統的BIO程式設計
網路程式設計的基本模型是Client/Server模型,通過三次揚建立連線,如果連線建立成功,雙方就可以通過網路套接字進行通訊。
BIO通訊模型圖
採用BIO通訊模型的服務端,通常由一個獨立的Acceptor執行緒負責監聽客戶端的連線,它接收到客戶端連線請求之後為每個客戶端建立一個新的執行緒進行鏈路處理,處理完成之後,通過輸出流返回應答給客戶端,執行緒銷燬。這是典型的一請求一應答通訊模型。
該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的執行緒個數和客戶端併發訪問數呈1:1的正比關係,由於執行緒是Java虛擬機器非常寶貴的系統資源,當執行緒數膨脹之後,系統的效能將急劇下降,隨著併發訪問量的繼續增大,系統會發生執行緒堆疊溢位、建立新執行緒失敗等問題,並最終導致程序宕機或者僵死,不能對外提供服務。
同步阻塞式I/O建立的TimeServer原始碼分析
TimeServer.java
package com.phei.netty.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class TimeServer {
public static void main(String[] args) throws IOException {
int port=8080;
if(args!=null && args.length>0 ){
try{
port=Integer.valueOf(args[0]);
}catch(NumberFormatException e){
}
}
ServerSocket server=null;
try{
server=new ServerSocket(port); //建立服務監聽
System.out.println("The time server is start in port:" + port);
Socket socket=null ;
while(true){
socket=server.accept();
new Thread(new TimeServerHandler(socket)).start(); //啟動新的執行緒來處理客戶端請求
}
}
finally{
if(server!=null)
{
System.out.println("The time server close");
server.close();
server=null;
}
}
}
}
TimeServerHandler
package com.phei.netty.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class TimeServerHandler implements Runnable {
private Socket socket;
public TimeServerHandler(Socket socket){
this.socket=socket;
}
@Override
public void run() {
// TODO Auto-generated method stub
BufferedReader in =null;
PrintWriter out=null;
try{
in=new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out=new PrintWriter(this.socket.getOutputStream(),true);
String currentTime=null;
String body=null;
while(true){
body=in.readLine();
if(body==null)break;
System.out.println("The time server receive order:"+body);
currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(
System.currentTimeMillis()).toString():"BAD ORDER";
out.println(currentTime);
}
}
catch(Exception e){
if(in!=null){
try{
in.close();
}
catch(IOException e1){
e1.printStackTrace();
}
}
if(out!=null){
out.close();
out=null;
}
if(this.socket!=null){
try{
this.socket.close();
}catch(IOException e1){
e1.printStackTrace();
}
this.socket=null;
}
}
}
}
Server執行後,使用Java VisualVM檢視執行緒:
可看到執行緒處於阻塞狀態。
BIO 客戶端原始碼分析
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class TimeClient {
public static void main(String[] args) {
// TODO Auto-generated method stub
int port=8080;
if(args!=null && args.length>0){
try{
port=Integer.valueOf(args[0]);
}catch(NumberFormatException e){
}
}
Socket socket=null;
BufferedReader in = null;
PrintWriter out=null;
try{
socket=new Socket("127.0.0.1",port);
in=new BufferedReader(new InputStreamReader(socket.getInputStream()));
out=new PrintWriter(socket.getOutputStream(),true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server success.");
String resp=in.readLine();
System.out.println("Now is : " + resp);
}catch(Exception e){
}finally{
if(out!=null){
out.close();
out=null;
}
if(in!=null){
try{
in.close();
}catch(IOException e){
e.printStackTrace();
}
in=null;
}
if(socket!=null){
try{
socket.close();
}catch(IOException e){
e.printStackTrace();
}
socket=null;
}
}
}
}
BIO的主要的問題在於,每當有一個新的客戶端請求接入時,服務端必須建立一個新的執行緒處理接入的客戶端鏈路,一個執行緒只能處理一個客戶端連線。在高效能伺服器應用領域,往往需要面向成千上萬個客戶端的併發連線,這種模型顯然無法滿足要求。
為了改進一執行緒一連線模型,後來又演進出一種通過執行緒池或者訊息佇列實現1個或者多個執行緒處理N個客戶端的模型,由於它的底層通訊機制依然使用同步阻塞I/O,所以被稱為“偽非同步”
偽非同步I/O程式設計
原始碼分析
TimeServer.java
package com.phei.netty.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class TimeServer {
public static void main(String[] args) throws IOException {
int port=8080;
if(args!=null && args.length>0){
try{
port=Integer.valueOf(args[0]);
}catch(NumberFormatException e){
}
}
ServerSocket server=null;
try{
server=new ServerSocket(port);
System.out.println("The time server is start in port:" + port);
Socket socket=null;
TimeServerHandlerExecutePool signleExecutor=new TimeServerHandlerExecutePool(50,10000);
while(true){
socket=server.accept();
signleExecutor.execute(new TimeServerHandler(socket));
}
}
finally{
if(server!=null)
{
System.out.println("The time server close");
server.close();
server=null;
}
}
}
}
主函式首先建立一個時間伺服器處理類的執行緒池,當接收到新的客戶端連線時,將請求Socket封裝成一個Task,然後呼叫執行緒池的execute方法執行,從而避免了每個請求接入都建立一個新的執行緒。
TimeServerHandler.java
package com.phei.netty.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class TimeServerHandler implements Runnable {
private Socket socket;
public TimeServerHandler(Socket socket){
this.socket=socket;
}
@Override
public void run() {
// TODO Auto-generated method stub
BufferedReader in =null;
PrintWriter out=null;
try{
in=new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out=new PrintWriter(this.socket.getOutputStream(),true);
String currentTime=null;
String body=null;
while(true){
body=in.readLine();
if(body==null)break;
System.out.println("The time server receive order:"+body);
currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(
System.currentTimeMillis()).toString():"BAD ORDER";
out.println(currentTime);
}
}
catch(Exception e){
if(in!=null){
try{
in.close();
}
catch(IOException e1){
e1.printStackTrace();
}
}
if(out!=null){
out.close();
out=null;
}
if(this.socket!=null){
try{
this.socket.close();
}catch(IOException e1){
e1.printStackTrace();
}
this.socket=null;
}
}
}
}
TimeServerHandlerExecutePool
package com.phei.netty.bio;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimeServerHandlerExecutePool {
private ExecutorService executor;
public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){
executor=new ThreadPoolExecutor(Runtime.getRuntime()
.availableProcessors(),maxPoolSize,120L,TimeUnit.SECONDS,
new ArrayBlockingQueue<java.lang.Runnable>(queueSize));
}
public void execute(java.lang.Runnable task){
executor.execute(task);
}
}
ThreadPoolExecutor
執行緒池類為Java.util.concurrent.ThreadPoolExecutor,常用的構造方法是:
ThreadPoolExecutor(int corePoolSize, 執行緒池維護執行緒的最少數量
int maximumPoolSize, 執行緒池維護執行緒的最大數量
long keepAliveTime, 執行緒池維護執行緒所允許的空閒時間
TimeUnit unit, 執行緒池維護執行緒所允許的空閒時間的單位
BlockingQueue workQueue, 執行緒池所使用的緩衝佇列
RejectedExecutionHandler handler) 執行緒池對拒絕任務的處理策略
一個任務通過execute(Runnable)方法被新增到執行緒池,任務就是一個Runnable型別的物件,任務的執行方法就是Runnable型別物件的run()方法。
當一個任務通過execute(Runnable)方法欲新增到執行緒池時,幾種情況:
- 如果紫時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒處於空閒狀態,也要建立新的執行緒來處理被新增的任務
- 如果此時執行緒池中的數量等於corePoolSize,但緩衝佇列workQueue未滿,那麼任務被放入緩衝佇列
- 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於aximumPoolSize,建新的執行緒來處理被新增的任務
- 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。也就是:處理任務的優先順序為:核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務
- 當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數
handler
1.ThreadPoolExecutor.AbortPolicy() 丟擲java.util.concurrent.RejectedExecutionException異常
2.ThreadPoolExecutor.CallerRunsPolicy()
當丟擲RejectedExecutionException異常時,會呼叫rejectedExecution方法
3.ThreadPoolExecutor.DiscardOldestPolicy()
拋棄舊的任務
4. ThreadPoolExecutor.DiscardPolicy()
拋棄當前的任務
由於執行緒池和訊息佇列都是有界的,因此,無論客戶端併發連線數多大,它都不會導致執行緒個數過於膨脹或者記憶體溢位,相比於傳統的一連線一執行緒模型,是一種改良。但它底層仍是同步阻塞模型,無法從根本上解決問題。
此段關於偽非同步的弊端分析,《Netty權威指南第2版》23頁描述比較詳細。
基於NIO的非阻塞程式設計
Non-block I/O
NIO類庫簡介
新的輸入/輸出(NIO)庫是JDK1.4中引入的。下面是幾個相關概念
1.緩衝區Buffer
Buffer是一個物件,包含一些要寫入或者要讀出的資料。在NIO物件中加入Buffer物件,體現了新庫與原I/O的一個重要區別。在面向流的I/O中,可以將資料直接寫入或者將資料直接讀到Stream物件中。
在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的;在寫入時,寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。
緩衝區實質是一個數組,通常是ByteBuffer,也可以使用其它種類的陣列。緩衝區提供了對資料的結構化訪問以及維護讀寫位置limit等資訊。
最常用的緩衝區ByteBuffer,提供了一組功能用於操作byte陣列。除了ByteBuffer,還有其它一些緩衝區。事實上,每一種Java基本型別(除了Boolean型別)都對應一種緩衝區,類圖繼承關係:
2.通道 Channel
Channel,就像自來水管一樣,網路資料通過Channel讀取和寫入。通道與流的不同之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutputStream的子類),而通道可以用於讀、寫或者二者同時進行。
因為Channel是全雙工的,所以它可以比流更好地對映底層作業系統的API。特別是UNIX網路程式設計模型中,底層作業系統的通道都是全雙工的。
Channel繼承關係類圖:
Channel可以分為兩大類:用於網路讀寫的SelectableChannel和用於檔案操作的FileChannel
3.多路複用器Selector
它是JavaNIO程式設計的基礎,提供選擇已經就緒任務的能力。Selector會不斷地輪詢註冊在其上的Channel,如果某個Channel上面發生讀/寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。
一個Selector可以同時輪詢多個Channel,由於JDK使用epoll()代替select實現,它沒有最大連線控制代碼1024/2048的限制,一個執行緒負責Selector的輪詢,就可以接入成千上萬的客戶端。
NIO 服務端序列圖
NIO服務端的主要建立過程:
1.步驟一:開啟ServerSocketChannel,用於監聽客戶端的連線,它是所有客戶端連線的父管道
ServerSocketChannel acceptorSvr=ServerSocketChannel.open();
2.步驟二:繫結監聽埠,設定連線為非阻塞模式
acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
acceptorSvr.configureBlocking(false);
3.步驟三:建立Reactor執行緒,建立多路複用器並啟動執行緒
selector selector=Selector.open();
New Thread(new ReactorTask()).start();
4.步驟四:將ServerSocketChannel註冊到Reactor執行緒的Selector上,監聽Accept事件
SelectionKey key=acceptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandler);
5.步驟五:Selector線上程run方法的無限迴圈體內輪詢準備就緒的Key
int num=selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it =selecterdKeys.iterator();
while(it.hasNext()){
SelectionKey key=(SelectionKey)it.next();
}
6.步驟六 Selector監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路
SocketChannel channel = svrChannel.accept();
7.步驟七 設定客戶端鏈路為非阻塞模式
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
8.步驟八 將新接入的客戶端連線註冊到Reactor執行緒的Selector上,監聽讀操作,讀取客戶端傳送的網路訊息
SelectionKey key=socketChannel.register(selector,SelectionKey.OP_READ,ioHandler);
9.步驟九 非同步讀取客戶端請求訊息到緩衝區
int readNumber=channel.read(receivedBuffer);
10.步驟十 對ByteBuffer進行硬編碼,如果有半包訊息指標reset,繼續讀取後續的報文,將解碼成功的訊息封裝成Task,投遞到業務執行緒池中,進行業務邏輯編排
Object message=null;
while(buffer.hasRemain())
{
byteBuffer.mark();
Object message=decode(byteBuffer);
if(message==null)
{
byteBuffer.reset();
break;
}
messagerList.add(message);
}
if(!byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if(messageList!=null & !messageList.isEmpty())
{
for(Object messageE:messageList)
handlerTask(messageE);
}
- 步驟十一 將POJO物件encode成ByteBuffer,呼叫SocketChannel的非同步write介面,將訊息非同步傳送給客戶端
socketChannel.write(buffer);
如果傳送區TCP緩衝區滿,會導致寫半包,此時,需要註冊監聽寫操作位,迴圈寫,直到整包訊息寫入TCP緩衝區。
原始碼分析
TimeServer.java
package com.phei.netty.bio;
import java.io.IOException;
public class TimeServer {
public static void main(String[] args) throws IOException {
int port=8080;
if(args!=null && args.length>0){
try{
port=Integer.valueOf(args[0]);
}catch(NumberFormatException e){
}
}
MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
new Thread(timeServer,"NIOMultiplexerTimeServer-001").start();
}
}
MultiplexerTimeServer
package com.phei.netty.bio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
//資源初始化
selector = Selector.open(); //建立Selector
servChannel = ServerSocketChannel.open(); //建立ServerSocketChannel
servChannel.configureBlocking(false); //設定為非同步非阻塞模式
servChannel.socket().bind(new InetSocketAddress(port), 1024); backlog為1024
servChannel.register(selector, SelectionKey.OP_ACCEPT); //註冊channel到Selector,監聽OP_ACCEPT操作位
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
//遍歷selector,休眠時間為1s,無論是否有讀寫等事件發生,每隔1s selector喚醒一次
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key)throws IOException{
if(key.isValid()){
//處理新接入的請求訊息
if(key.isAcceptable()){
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
SocketChannel sc=ssc.accept(); //接收客戶端的連線請求並建立SocketChannel例項。完成上述操作後,相當於完成了TCP的三次握手,物理鏈路正式建立
sc.configureBlocking(false); //非同步非阻塞,同時可以對TCP引數進行設定,如TCP接收和傳送緩衝區的大小等
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
//讀取客戶端的請求訊息
SocketChannel sc=(SocketChannel)key.channel();
ByteBuffer readBuffer=ByteBuffer.allocate(1024); //建立一個ByteBuffer緩衝區
int readBytes=sc.read(readBuffer); //讀取請求碼流,read是非阻塞的
if(readBytes>0){ //讀到了位元組
readBuffer.flip(); //將緩衝區當前的limit設定為position,position設定為0,用於對緩衝區的讀取操作,然後根據緩衝區可讀的位元組個數建立位元組陣列
byte[] bytes=new byte[readBuffer.remaining()];
readBuffer.get(bytes); //將緩衝區可讀的位元組陣列複製到新建立的位元組陣列中
String body=new String(bytes,"UTF-8");
System.out.println("The time server receive order : "+body);
String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes<0){ //鏈路已經關閉,需要關閉SocketChannel,釋放資源
key.cancel();
sc.close();
}else
; //讀到0位元組,忽略
}
}
}
private void doWrite(SocketChannel channel,String response) throws IOException{
if(response!=null && response.trim().length()>0){
byte[] bytes=response.getBytes();
ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
//由於SocketChannel是非同步非阻塞的,它並不保證一次能夠把需要傳送的位元組陣列傳送完,此時會出現“寫半包”問題。我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的ByteBuffer傳送完畢,然後可以通過ByteBuffer的hasRemain()方法判斷訊息是否傳送完成。此程式碼沒有演示這部分操作
}
}
}
NIO客戶端序列圖
1.步驟一 開啟SocketChannel
2.步驟二 設定SocketChannel為非阻塞模式,同時設定客戶端連線的TCP引數
3.步驟三 非同步連線服務端
4.步驟四 判斷是否連線成功…
5.步驟五 向Reacctor執行緒的Selector註冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答
clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
6.步驟六 建立Reactor執行緒,建立Selector
7.步驟七 Selector線上程run方法的無限迴圈體內輪詢準備就緒的Key
8.步驟八 接收connect事件進行處理
if(key.isConnectable())
//handlerConnect();
9.步驟九 判斷連線結果,如果連線成功,註冊讀事件到Selector
if(channel.finishConnect())
registerRead();
- 步驟十 註冊讀事件到Selector
clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
- 步驟十一 非同步讀客戶端請求訊息到緩衝區
int readNumber=channel.read(receivedBuffer);
- 步驟十二 對ByteBuffer進行編解碼,如果有半包訊息接收緩衝區Reset,繼續讀取後續的報文,將解碼成功的訊息封裝成Task,投遞到業務執行緒池中,進行業務邏輯編排。
Object message=null;
while(buffer.hasRemain()){
byteBuffer.mark();
Object message = decode(byteBuffer);
if(message==null)
{
byteBuffer.reset();
break;
}
messageList.add(message);
}
if(!byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if(messageList!=null & !messageList.isEmpty())
{
for(Object messageE : messageList)
handlerTask(messageE);
}
- 步驟十三 將POJO物件encode成ByteBuffer,呼叫SockeChannel的非同步write介面,將訊息非同步傳送給客戶端
socketChannel.write(buffer);
示例:
TimeClient.java
import com.phei.netty.nio.TimeClientHandle;
public class TimeClient {
public static void main(String[] args) {
// TODO Auto-generated method stub
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClien-001").start();
}
}
TimeClientHandle.java
package com.phei.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host,int port){
this.host=host==null?"127.0.0.1":host;
this.port=port;
try{
selector=Selector.open();
socketChannel=SocketChannel.open();
socketChannel.configureBlocking(false); //設定為非同步非阻塞模式,下面可以設定SocketChannel的TCP引數,例如接收和傳送的TCP緩衝區大小
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try{
//傳送連線請求
doConnect();
}catch(IOException e){
e.printStackTrace();
//作為示例,這裡認為連線是成功的,沒有進行重連操作
System.exit(1);
}
while(!stop){
try{
//輪詢,當有就緒的Channel時,執行handleInput(key)方法
selector.select(1000);
Set<SelectionKey> selectedKeys=selector.selectedKeys();
Iterator<SelectionKey> it=selectedKeys.iterator();
SelectionKey key=null;
while(it.hasNext()){
key=it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key!=null){
key.cancel();
if(key.channel()!=null)
key.channel().close();
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//Selector關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源。
if(selector !=null)
try{
selector.close();
}catch(IOException e){
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//判斷是否連線成功
SocketChannel sc=(SocketChannel)key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
//客戶端連線成功,監聽網路讀操作,然後傳送請求訊息給服務端
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}else
//客戶端連線失敗
System.exit(1);
}
if(key.isReadable()){
ByteBuffer readBuffer=ByteBuffer.allocate(1024); //分配1M ByteBuffer
int readBytes=sc.read(readBuffer);
if(readBytes>0){
readBuffer.flip();
byte[] bytes=new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body=new String(bytes,"UTF-8");
System.out.println("Now is : " + body);
this.stop=true;
}else if(readBytes<0){
key.cancel();
sc.close();
}else
;
}
}
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
//如果連線成功,則將SocketChannel註冊到Selector上,註冊SectionKey.OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else
//如果沒有連線成功,則說明服務端沒有返回TCP握手應答訊息,但這並不代表連線失敗。需要將SocketChannel註冊到Selector上,註冊SelectionKey.OP_CONNECT。當服務端返回TCP syn-ack訊息後,Selector就能夠輪詢到這個SocketChannel處於連線就緒狀態
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void doWrite(SocketChannel sc)throws IOException{
byte[] req="QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer); //這裡同樣存在“半包寫”問題
if(!writeBuffer.hasRemaining()) //如果傳送結果全部發送完成
System.out.println("Send order 2 server succeed.");
}
}
JDK1.7升級了NIO類庫,升級後的NIO類庫被稱為NIO 2.0 。JAVA正式提供了非同步檔案I/O操作,同時提供了與UNIX網路程式設計事件驅動I/O對應的AIO。
基於NIO2.0的非同步非阻塞(AIO)程式設計
NIO 2.0 引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供以下兩種方式獲取操作結果:
- 通過java.util.concurrent.Future 類來表示非同步操作的結果
- 在執行非同步操作的時候傳入一個java.nio.channels
CompletionHandler介面的實現類作為操作完成的回撥。
AIO 相對於NIO程式設計更為簡單,非同步SocketChannel是被動執行物件,不需要像NIO程式設計那樣建立一個獨立的I/O來處理讀寫操作。
目前國內商用的主流Java版本仍是JDK1.6 ,NIO2.0使用相對較少。
為什麼要使用NIO
不同I/O模型對比
JAVA原生NIO一些問題
- NIO類庫和API繁雜,使用麻煩
- 需要具備其它的額外技能做鋪墊,如熟悉JAVA多執行緒程式設計。這是因為NIO程式設計涉及到Reactor模式,必須對多執行緒和網路程式設計非常熟悉,才能編寫出高質量的NIO程式。
- 可靠效能力補齊的工作量和難度都非常大,如:客戶端面臨重連、網路閃斷、半包讀寫、失敗快取、網路擁塞和異常碼流的處理等問題
- JDK NIO的BUG,如epoll bug,會導致Selector空輪詢,最終導致CPU100%