NIO,BIO,AIO,JAVA通訊程式設計學習筆記3
下文內容摘自《Netty 權威指南》
JDK1.7升級了NIO類庫,升級後的NIO類庫被稱為NIO2.0,引人注目的是Java正式提供了非同步檔案IO操作,同時提供了與Unix網路程式設計事件驅動IO對應的AIO,下面的2.4章節我們學習下如何利用NIO2.0編寫AIO程式,我們還是以時間伺服器為例進行講解。
AIO程式設計
NIO2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供兩種方式獲取獲取操作結果:
- 通過java.util.concurrent.Future類來表示非同步操作的結果;
- 在執行非同步操作的時候傳入一個java.nio.channels.
CompletionHandler介面的實現類作為操作完成的回撥。
NIO2.0的非同步套接字通道是真正的非同步非阻塞IO,它對應Unix網路程式設計中的事件驅動IO(AIO),它不需要通過多路複用器(Selector)對註冊的通道進行輪詢操作即可實現非同步讀寫,簡化了NIO的程式設計模型。
下面還是通過程式碼來熟悉NIO2.0 AIO的相關類庫,我們仍舊以時間伺服器為例程進行講解。
AIO 建立的TimeServer原始碼分析
首先看下時間伺服器的主函式:
程式1
我們重點對AsyncTimeServerHandler進行分析,首先看8-15行,在構造方法中,我們首先建立一個非同步的服務端通道AsynchronousServerSocketChannel,然後呼叫它的bind方法繫結監聽埠,如果埠合法且沒被佔用,繫結成功,列印啟動成功提示到控制檯。
線上程的run方法中,第26行我們初始化CountDownLatch物件,它的作用是在完成一組正在執行的操作之前,允許當前的執行緒一直阻塞。在本例程中,我們讓執行緒在此阻塞,防止服務端執行完成退出。在實際專案應用中,不需要啟動獨立的執行緒來處理AsynchronousServerSocketChannel,這裡僅僅是個demo演示。
第24行用於接收客戶端的連線,由於是非同步操作,我們可以傳遞一個
CompletionHandler<AsynchronousSocketChannel,? super A>型別的handler例項接收accept操作成功的通知訊息,在本例程中我們通過AcceptCompletionHandler例項作為handler接收通知訊息,下面,我們繼續對AcceptCompletionHandler進行分析:
程式2
CompletionHandler有兩個方法,分別是:
1) public void completed(AsynchronousSocketChannel result,
AsyncTimeServerHandler attachment);
2) public void failed(Throwable exc, AsyncTimeServerHandler attachment);
下面我們分別對這兩個介面的實現進行分析:首先看completed介面的實現,程式碼7-10行,我們從attachment獲取成員變數AsynchronousServerSocketChannel,然後繼續呼叫它的accept方法。可能讀者在此可能會心存疑惑,既然已經接收客戶端成功了,為什麼還要再次呼叫accept方法呢?原因是這樣的:當我們呼叫AsynchronousServerSocketChannel的accept方法後,如果有新的客戶端連線接入,系統將回調我們傳入的CompletionHandler例項的completed方法,表示新的客戶端已經接入成功,因為一個AsynchronousServerSocketChannel可以接收成千上萬個客戶端,所以我們需要繼續呼叫它的accept方法,接收其它的客戶端連線,最終形成一個迴圈。每當接收一個客戶讀連線成功之後,再非同步接收新的客戶端連線。
鏈路建立成功之後,服務端需要接收客戶端的請求訊息,程式碼第8行我們建立新的ByteBuffer,預分配1M的緩衝區。第8行我們通過呼叫AsynchronousSocketChannel的read方法進行非同步讀操作。下面我們看看非同步read方法的引數:
1) ByteBuffer dst:接收緩衝區,用於從非同步Channel中讀取資料包;
2) A attachment:非同步Channel攜帶的附件,通知回撥的時候作為入參使用;
3) CompletionHandler<Integer,? super A>:接收通知回撥的業務handler,本例程中為ReadCompletionHandler。
下面我們繼續對ReadCompletionHandler進行分析:
程式3
首先看構造方法,我們將AsynchronousSocketChannel通過引數傳遞到ReadCompletionHandler中當作成員變數來使用,主要用於讀取半包訊息和傳送應答。本例程不對半包讀寫進行具體解說,對此感興趣的可以關注後續章節對Netty半包處理的專題介紹。我們繼續看程式碼,第12-25行是讀取到訊息後的處理,首先對attachment進行flip操作,為後續從緩衝區讀取資料做準備。根據緩衝區的可讀位元組數建立byte陣列,然後通過new String方法建立請求訊息,對請求訊息進行判斷,如果是”QUERY
TIME ORDER”則獲取當前系統伺服器的時間,呼叫doWrite方法傳送給客戶端。下面我們對doWrite方法進行詳細分析。
跳到程式碼第28行,首先對當前時間進行合法性校驗,如果合法,呼叫字串的解碼方法將應答訊息編碼成位元組陣列,然後將它拷貝到傳送緩衝區writeBuffer中,最後呼叫AsynchronousSocketChannel的非同步write方法。正如前面介紹的非同步read方法一樣,它也有三個與read方法相同的引數,在本例程中我們直接實現write方法的非同步回撥介面CompletionHandler,程式碼跳到第24行,對傳送的writeBuffer進行判斷,如果還有剩餘的位元組可寫,說明沒有傳送完成,需要繼續傳送,直到傳送成功。
最後,我們關注下failed方法,它的實現很簡單,就是當發生異常的時候,我們對異常Throwable進行判斷,如果是IO異常,就關閉鏈路,釋放資源,如果是其它異常,按照業務自己的邏輯進行處理。本例程作為簡單demo,沒有對異常進行分類判斷,只要發生了讀寫異常,就關閉鏈路,釋放資源。
非同步非阻塞IO版本的時間伺服器服務端已經介紹完畢,下面我們繼續看客戶端的實現
首先看下客戶端主函式的實現,AIO時間伺服器客戶端 TimeClient:
程式4
第15行我們通過一個獨立的IO執行緒建立非同步時間伺服器客戶端handler,在實際專案中,我們不需要獨立的執行緒建立非同步連線物件,因為底層都是通過JDK的系統回撥實現的,在後面執行時間伺服器程式的時候,我們會抓取執行緒呼叫堆疊給大家展示。繼續看程式碼, AsyncTimeClientHandler的實現類原始碼如下:
程式5
由於在AsyncTimeClientHandler中大量使用了內部匿名類,所以程式碼看起來稍微有些複雜,下面我們就對主要程式碼進行詳細解說。
9-17行是構造方法,首先通過AsynchronousSocketChannel的open方法建立一個新的AsynchronousSocketChannel物件。然後跳到第36行,建立CountDownLatch進行等待,防止非同步操作沒有執行完成執行緒就退出。第37行通過connect方法發起非同步操作,它有兩個引數,分別如下:
1) A attachment : AsynchronousSocketChannel的附件,用於回撥通知時作為入參被傳遞,呼叫者可以自定義;
2) CompletionHandler<Void,? super A> handler:非同步操作回撥通知介面,由呼叫者實現。
在本例程中,我們的兩個引數都使用AsyncTimeClientHandler類本身,因為它實現了CompletionHandler介面。
接下來我們看非同步連線成功之後的方法回撥completed方法,程式碼第39行,我們建立請求訊息體,對其進行編碼,然後拷貝到傳送緩衝區writeBuffer中,呼叫AsynchronousSocketChannel的write方法進行非同步寫,與服務端類似,我們可以實現CompletionHandler<Integer, ByteBuffer>介面用於寫操作完成後的回撥,程式碼第45-47行,如果傳送緩衝區中仍有尚未傳送的位元組,我們繼續非同步傳送,如果已經發送完成,則執行非同步讀取操作。
程式碼第64-97行是客戶端非同步讀取時間伺服器服務端應答訊息的處理邏輯,程式碼第49行我們呼叫AsynchronousSocketChannel的read方法非同步讀取服務端的響應訊息,由於read操作是非同步的,所以我們通過內部匿名類實現CompletionHandler<Integer, ByteBuffer>介面,當讀取完成被JDK回撥時,我們構造應答訊息。第56-63行我們從CompletionHandler的ByteBuffer中讀取應答訊息,然後列印結果。
第197-96行,當讀取發生異常時,我們關閉鏈路,同時呼叫CountDownLatch的countDown方法讓AsyncTimeClientHandler執行緒執行完畢,客戶端退出執行。
需要指出的是,正如之前的NIO例程,我們並沒有完整的處理網路的半包讀寫,當對例程進行功能測試的時候沒有問題,但是,如果對程式碼稍加改造,進行壓力或者效能測試,就會發現輸出結果存在問題。
由於半包的讀寫會作為專門的小節在Netty的應用和原始碼分析章節進行詳細講解,在NIO的入門章節我們就不詳細展開介紹,以便讀者能夠將注意力集中在NIO的入門知識上來。
服務端程式:
package AIO;
import java.io.IOException;
public class TimeServer {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8580;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
================================================================
package AIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel
.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
asynchronousServerSocketChannel.accept(this,
new AcceptCompletionHandler());
}
}
================================================================
package AIO;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
================================================================
package AIO;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null) {
this.channel = channel;
}
}
private void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = (currentTime).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果沒有傳送完成,繼續傳送
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
================================================================
客戶端程式:
package AIO;
public class TimeClient {
public static void main(String[] args) {
int port = 8580;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start();
}
}
================================================================
package AIO;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result,
ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println("Now is : " + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
執行結果:
伺服器端輸出:
The time server is start in port : 8580
The time server receive order : QUERY TIME ORDER
客戶端輸出:
Now is : Wed Nov 02 13:59:35 CST 2016
相關推薦
NIO,BIO,AIO,JAVA通訊程式設計學習筆記3
下文內容摘自《Netty 權威指南》 JDK1.7升級了NIO類庫,升級後的NIO類庫被稱為NIO2.0,引人注目的是Java正式提供了非同步檔案IO操作,同時提供了與Unix網路程式設計事件驅動IO對應的AIO,下面的2.4章節我們學習下如何利用NIO2.0編寫AIO程
java 併發程式設計學習筆記(一)之 併發基礎
併發基礎 併發小測試 java.util.concurrent.Semaphore 類 public class SemTest { /** * Se
java 併發程式設計學習筆記(一)之 基礎框架搭建和併發模擬工具,程式碼
基礎框架搭建和併發模擬工具,程式碼 (1)基礎框架搭建 (2)併發模擬 (3)CountDownLatch 通常用來 保證 幾個執行緒執行完成之後,再執行其他的程式碼 Semaphore
java 併發程式設計學習筆記(七)FutureTask, ForkJoin, BlockingQueue
(1)Future 、FutureTask public class FutureExample { static class MyTask implements Callable<String> { @Override pu
java網路程式設計學習筆記 流式套接字程式設計
tcp是Transmission Control Protocol即傳輸控制協議,是一種面性連線的協議。 在java中使用tcp程式設計需要用到兩個類 1.ServerSocket(代表伺服器) 2.Socket(代表客戶端) 伺服器端程式碼: //伺服器端在埠8888監聽 Serve
ARM體系結構與程式設計學習筆記3
第三章 ARM指令集介紹 ARM的指令集可以分為6類,即跳轉指令,資料處理指令,程式狀態暫存器,Load/Store指令,協處理器指令,和異常中斷產生指令。 1:跳轉指令: 長跳轉: 直接向PC暫存器中寫入目標地址值可以實現4G地址空間的任意跳轉。MOV LR,PC 1:B:跳轉指令 2;B
Java併發程式設計學習記錄#3
共享物件 我們已經見識到同步方法和同步程式碼塊能夠保證操作執行的原子性,但同時這也是一個常見的誤區:同步僅僅關於原子性。其實,同步還有另一個重要而微妙的方面–記憶體可見性。我們不僅僅希望阻止一個執行緒修改另一個執行緒正在使用的物件,我們還希望當一個執行緒修改了某個物件後,其改變後的狀
深入理解Java虛擬機器學習筆記3-執行緒安全和鎖優化
併發處理是壓榨計算機運算能力最有力的工具。 1.執行緒安全 當多個執行緒訪問一個物件時,如果不用考慮這些執行緒執行時環境下排程和交替執行,也不需要進行額外的同步,或者在呼叫方進行任何其他的協調操作,呼叫這個物件的行為都可以獲取正確的結果,那麼這個物件是執行緒安全的。 2
Java併發程式設計實戰筆記3:基礎構建模組
在上文已經說明,委託是構造執行緒安全類的一個最有效策略,也就是讓現有的執行緒安全類管理所有的狀態即可。以下將介紹這些基礎構建模組。 同步容器類 同步容器類包括Vector和Hashtable以及由Collections.synchronizedXxx等工廠方法建立的同步封裝器類。這些類實現執行緒安全的方式
Java Socket 程式設計學習(3)
Java Socket 程式設計學習,利用DatagramSocket和DatagramPacket實現UDP資料傳輸 場景描述: 客戶端建立資料包,繫結伺服器地址和埠,向伺服器傳送資料;伺服器繫結埠,從埠接受資料 伺服器程式碼: ServerBean類 package y
【 專欄 】- Java NIO 與 Netty 網路程式設計學習筆記
Java NIO 與 Netty 網路程式設計學習筆記 以一個IM聊天功能的實現。記錄筆者從最原始的阻塞IO(BIO)到JDK1.4提供的非阻塞IO,再到JDK 1.7 非同步IO的學習筆記,最後到Netty框架的學習筆記。
java丨事件驅動程式設計學習筆記(二)
一、匿名監聽器 監聽器類是特意為建立一個GUI元件(例如,一個按鈕)而設計的監聽物件。監聽器類不被其他應用程式所共享,因此,正確的做法是將它作為一個內部類定義在框架中。 可以使用匿名內部類簡化內部類監聽器。匿名內部類時沒有名字的內部類。它進一步完成定義內部類和建立一個該類的例項。 內部類Enlarg
BIO、NIO、AIO及網路程式設計
一. 網路程式設計的一些基礎 1.先說明一下執行緒的掛起、阻塞、睡眠 執行緒從建立、執行到結束總是處於下面五個狀態之一:新建狀態、就緒狀態、執行狀 &nb
阿里Java開發手冊學習筆記(一)----程式設計規約
一、命名規範 不以下劃線(_)或美元符號($)開始/結尾。 不允許中英文混合使用,不允許直接使用中文。 類名使用UpperCamelCase風格;方法名、引數名、成員變數使用lowerCamelC
java丨事件驅動程式設計學習筆記(一)
一、事件和事件源 事件:事件可以定義為程式發生了某些事情的訊號 源物件(源元件):能建立一個事件並觸發該事件的元件成為源物件 事件類的根類:java.util.EventObject 可以使用EventObject類中的例項方法getSource()獲得事件的源物件 如果一
Linux程式設計學習筆記----System V程序間通訊(訊號量)
關於System V Unix System V,是Unix作業系統眾多版本中的一支。它最初由AT&T開發,在1983年第一次釋出,因此也被稱為AT&T System V。一共發行了4個System V的主要版本:版本1、2、3和4。System V Rel
浙大《面向物件程式設計--java語言》學習筆記(第八週:異常處理與輸入輸出)
8.1 異常 ArrayIndex.java package exception; import java.util.Scanner; public class ArrayIndex { public static void main(String[] args) { // TO
浙大《面向物件程式設計--java語言》學習筆記(第四周:繼承和多型)
4.1 繼承 這裡我們有三個檔案 首先是Database.java package dome; import java.util.ArrayList; public class Database { private ArrayList<CD>