IO模型之AIO程式碼及其實踐詳解
一、AIO簡介
AIO是java中IO模型的一種,作為NIO的改進和增強隨JDK1.7版本更新被整合在JDK的nio包中,因此AIO也被稱作是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在於JDK中,NIO於JDK1.4版本釋出更新)的阻塞式讀寫,AIO提供了從建立連線到讀、寫的全非同步操作。AIO可用於非同步的檔案讀寫和網路通訊。
二、同步/非同步、阻塞/非阻塞
我們先來了解下什麼是同步/非同步,以及什麼是阻塞/非阻塞。在IO操作中,IO分兩階段(一旦拿到資料後就變成了資料操作,不再是IO):
- 資料準備階段
- 核心空間複製資料到使用者程序緩衝區(使用者空間)階段 在作業系統中,程式執行的空間分為核心空間和使用者空間。 應用程式都是執行在使用者空間的,所以它們能操作的資料也都在使用者空間。
- 同步和非同步IO的概念:同步是使用者執行緒發起I/O請求後需要等待或者輪詢核心I/O操作完成後才能繼續執行 非同步是使用者執行緒發起I/O請求後仍需要繼續執行,當核心I/O操作完成後會通知使用者執行緒,或者呼叫使用者執行緒註冊的回撥函式。
- 阻塞和非阻塞IO的概念: 阻塞是指I/O操作需要徹底完成後才能返回使用者空間 非阻塞是指I/O操作被呼叫後立即返回一個狀態值,無需等I/O操作徹底完成。
一般來講: 阻塞IO模型、非阻塞IO模型、IO複用模型(select/poll/epoll)、訊號驅動IO模型都屬於同步IO,因為階段2是阻塞的(儘管時間很短)。同步IO和非同步IO的區別就在於第二個步驟是否阻塞: 如果不阻塞,而是作業系統幫你做完IO操作再將結果返回給你,那麼就是非同步IO。
三、非同步IO模型
非同步IO則採用“訂閱-通知”模式:即應用程式向作業系統註冊IO監聽,然後繼續做自己的事情。當作業系統發生IO事件,並且準備好資料後,在主動通知應用程式,觸發相應的函式。也可以如下圖理解:
和同步IO一樣,非同步IO也是由作業系統進行支援的。微軟的windows系統提供了一種非同步IO技術:IOCP(I/O CompletionPort,I/O完成埠);Linux下由於沒有這種非同步IO技術,所以使用的是epoll對非同步IO進行模擬。
四、JAVA AIO框架簡析
JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路複用IO技術模擬非同步IO,這個從JAVA AIO框架的部分類設計上就可以看出來。例如框架中,在Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,在Linux下負責實現套接字通道的具體類是“sun.nio.ch.UnixAsynchronousServerSocketChannelImpl”,如下圖在Mac上安裝的JDK可以看到:
另外特別說明一下,請注意在上圖中的“java.nio.channels.NetworkChannel”介面,這個介面同樣被JAVA NIO框架實現了,如上圖所示:SocketChannel以及ServerSocketChannel就是NetworkChannel的實現。
AIO和同步IO(BIO和NIO)不同在於,IO操作全部委託給了被呼叫者(作業系統),在阻塞和非阻塞IO中,不管是使用阻塞流還是使用select選擇器,使用者程序下一步操作都是依賴作業系統的IO操作結果的,也就是需要同步的。而在AIO中,也就是前面通俗說的,(先寫好回撥操作)呼叫系統的IO操作在java中,支援非同步模型的方式有兩個類:
- Future類
- Callable介面
五、AIO重要類
實現一個最簡單的AIO socket通訊server、client,主要需要這些相關的類和介面:
AsynchronousServerSocketChannel
服務端Socket通道類,負責服務端Socket的建立和監聽;
AsynchronousSocketChannel
客戶端Socket通道類,負責客戶端訊息讀寫;
CompletionHandler<A,V>
訊息處理回撥介面,是一個負責消費非同步IO操作結果的訊息處理器;
ByteBuffer
負責承載通訊過程中需要讀、寫的訊息。
此外,還有可選的用於非同步通道資源共享的AsynchronousChannelGroup
類,接下來將一一介紹這些類的主要介面及使用。
1、AsynchronousServerSocketChannel
AsynchronousServerSocketChannel是一個流式監聽套接字的非同步通道,是ServerSocketChannel的非同步版本的通道,支援非同步處理。AsynchronousServerSocketChannel的使用和ServerSocketChannel一樣需要經過三個步驟:建立/開啟通道、繫結地址和埠和監聽客戶端連線請求。
1.1 建立/開啟通道
簡單地,可以通過呼叫AsynchronousServerSocketChannel的靜態方法open()來建立AsynchronousServerSocketChannel例項:try { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); }
當開啟通道失敗時,會丟擲一個IOException異常。
1.2 繫結地址和埠
通過呼叫AsynchronousServerSocketChannel.bind(SocketAddress)方法來繫結監聽地址和埠:
// 構建一個InetSocketAddress例項以指定監聽的地址和埠,如果需要指定ip,則呼叫InetSocketAddress(ip,port)構造方法建立即可 serverSocketChannel.bind(new InetSocketAddress(port));
1.3 監聽和接收客戶端連線請求
監聽客戶端連線請求,主要通過呼叫AsynchronousServerSocketChannel.accept()方法完成。accept()有兩個過載方法:
public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>); public abstract Future<AsynchronousSocketChannel> accept();
這兩個過載方法的行為方式完全相同一種基於Future,一種基於回撥,事實上,AIO的很多非同步API都封裝了諸如此類的過載方法:提供CompletionHandle回撥引數或者返回一個Future<T>型別變數。用過Feture介面的都知道,可以呼叫Feture.get()方法阻塞等待呼叫結果。無論是哪種方式來獲取連線,最終的處理物件都是Socket,和ServerSocketChannel不同的是,這裡的socket是封裝在AsynchronousSocketChannel
中的。
基於Future實現:
public void AsynchronousServerSocketChannel() { try { AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(8888)); while (true) { Future<AsynchronousSocketChannel> conn = channel.accept(); // 阻塞等待直到future有結果 AsynchronousSocketChannel asyncSocketChannel = conn.get(); // 非同步處理連線 asyncHandle(asyncSocketChannel); } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } }
基於回撥:
public void AsynchronousServerSocketChannelCallback() { try { AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(8888)); channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel result, Void attachment) {
// 接收到新的客戶端連線時呼叫,result就是和客戶端的連線對話,此時可以通過result和客戶端進行通訊 System.out.println("accept completed"); // 非同步處理連線 asyncHandle(result); // 繼續監聽accept channel.accept(null, this); } @Override public void failed(Throwable exc, Void attachment) {
// accept失敗時回撥 System.out.println("accept failed"); } }); // 讓主執行緒保持存活 while (true) { System.in.read(); } } catch (IOException e) { e.printStackTrace(); } }
需要注意的是,AsynchronousServerSocketChannel是執行緒安全的,但在任何時候同一時間內只能允許有一個accept操作。因此,必須得等待前一個accept操作完成之後才能啟動下一個accept:
serverSocketChannel .accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() { @Override public void completed(final AsynchronousSocketChannel result, final AsynchronousServerSocketChannel attachment) { // 接收到新的客戶端連線,此時本次accept已經完成 // 繼續監聽下一個客戶端連線到來 serverSocketChannel.accept(serverSocketChannel,this); // result即和該客戶端的連線會話 // 此時可以通過result與客戶端進行互動 } ... });
此外,還可以通過以下方法獲取和設定AsynchronousServerSocketChannel的socket選項:
// 設定socket選項 serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設定 boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
其中StandardSocketOptions類封裝了常用的socket設定選項。
獲取本地地址:
InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
1.4AsynchronousChannelGroup非同步通道組
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10); AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group); } catch (IOException e) { e.printStackTrace(); }
AsynchronousServerSocketChannel提供了設定通道分組(AsynchronousChannelGroup)的功能,以實現組內通道資源共享。可以呼叫open(AsynchronousChannelGroup)過載方法建立指定分組的通道,預設情況下,具有 open() 方法的通道屬於一個全域性通道組,可利用如下系統變數對其進行配置:
java.nio.channels.DefaultThreadPoolthreadFactory
,其不採用預設設定,而是定義一個 java.util.concurrent.ThreadFactoryjava.nio.channels.DefaultThreadPool.initialSize
,指定執行緒池的初始規模
java.nio.channels.AsynchronousChannelGroup 中的三個實用方法提供了建立新通道組的方法:
withCachedThreadPool() withFixedThreadPool() withThreadPool()
這些方法或者對執行緒池進行定義,如 java.util.concurrent.ExecutorService,或者是 java.util.concurrent.ThreadFactory。例如,以下呼叫建立了具有執行緒池的新的通道組,該執行緒池包含 10 個執行緒,其中每個都構造為來自 Executors 類的執行緒工廠:
AsynchronousChannelGroup tenThreadGroup = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
三個非同步網路通道都具有 open() 方法的替代版本,它們採用給出的通道組而不是預設通道組。例如,當有非同步操作請求時,此呼叫告訴 channel 使用 tenThreadGroup 而不是預設通道組來獲取執行緒:
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(tenThreadGroup);
定義自己的通道組可更好地控制服務於操作的執行緒,並能提供關閉執行緒或者等待終止的機制。
AsynchronousChannelGroup封裝了處理由繫結到組的非同步通道所觸發的I/O操作完成所需的機制。每個AsynchronousChannelGroup關聯了一個被用於提交處理I/O事件和分發消費在組內通道上執行的非同步操作結果的completion-handlers的執行緒池。除了處理I/O事件,該執行緒池還有可能處理其他一些用於支援完成非同步I/O操作的任務。從上面例子可以看到,通過指定AsynchronousChannelGroup的方式開啟AsynchronousServerSocketChannel,可以定製server channel執行的執行緒池。如果不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個預設的分組中。
2、AsynchronousSocketChannel
AsynchronousSocketChannel和NIO通道是SocketChannel功能相似。是一個流式連線套接字的非同步通道。
AsynchronousSocketChannel表示服務端與客戶端之間的連線通道。客戶端可以通過呼叫AsynchronousSocketChannel靜態方法open()建立,而服務端則通過呼叫AsynchronousServerSocketChannel.accept()方法後由AIO內部在合適的時候建立。下面以客戶端實現為例,介紹AsynchronousSocketChannel。
2.1 建立AsynchronousSocketChannel
需要通過open()建立和開啟一個AsynchronousSocketChannel例項,再呼叫其connect()方法連線到服務端,接著才可以與服務端互動:
// 開啟一個socket通道 AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); // 阻塞等待連線成功 socketChannel.connect(new InetSocketAddress(ip,port)).get(); // 連線成功,接下來可以進行read、write操作
同AsynchronousServerSocketChannel,AsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用於指定通道分組和定製執行緒池。
2.2 connect
socketChannel.connect()也提供了CompletionHandler回撥和Future返回值兩個過載方法,上面例子使用帶Future返回值的過載,並呼叫get()方法阻塞等待連線建立完成。
// 基於回撥 public abstract <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler); // 基於Future 呼叫get()方法阻塞等待連線建立完成 public abstract Future<Void> connect(SocketAddress remote);
2.3 傳送訊息
可以構建一個ByteBuffer物件並呼叫socketChannel.write(ByteBuffer)方法非同步傳送訊息,並通過CompletionHandler回撥接收處理髮送結果:
ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes()); socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 傳送完成,result:總共寫入的位元組數 } @Override public void failed(final Throwable exc, final Object attachment) { // 傳送失敗 } });
2.4 讀取訊息
構建一個指定接收長度的ByteBuffer用於接收資料,呼叫socketChannel.read()方法讀取訊息並通過CompletionHandler處理讀取結果:
ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 讀取完成,result:實際讀取的位元組數。如果通道中沒有資料可讀則result=-1。 } @Override public void failed(final Throwable exc, final Object attachment) { // 讀取失敗 } });
此外,AsynchronousSocketChannel也封裝了設定/獲取socket選項的方法:
// 設定socket選項 socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設定 boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
注意:讀寫操作,有多個過載的Future和回撥式的read和write方法:
public abstract <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler); public final <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler) public abstract Future<Integer> read(ByteBuffer dst); public abstract <A> void read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler); // write public abstract <A> void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler); public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler); public abstract Future<Integer> write(ByteBuffer src); public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler);
如下伺服器端示例,使用的是accept返回的channel:
// 基於future 實際上是同步的讀取方式 private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) { ByteBuffer dst = ByteBuffer.allocate(1024); // based on Future, // 實際上是同步處理的方式,為了不將處理變成阻塞式單連線的socket形式,使用子執行緒來獲取輸入流 new Thread(() -> { while (asyncSocketChannel.isOpen()) { Future<Integer> readFuture = asyncSocketChannel.read(dst); try { // 阻塞等待讀取結果 Integer readResult = readFuture.get(); if (readResult > 0) { System.out.println(new String(dst.array(), StandardCharsets.UTF_8)); dst.clear(); } else { // doOtherthing } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }).start(); } // 基於回撥 private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) { asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { if (result > 0) { System.out.println(new String(dst.array(), StandardCharsets.UTF_8)); dst.clear(); } // 註冊回撥,繼續讀取輸入 asyncSocketChannel.read(dst, null, this); } @Override public void failed(Throwable exc, Void attachment) { // TODO Auto-generated method stub } }); }
3、CompletionHandler
CompletionHandler是一個用於消費非同步I/O操作結果的處理器。
AIO中定義的非同步通道允許指定一個CompletionHandler處理器消費一個非同步操作的結果。從上文中也可以看到,AIO中大部分的非同步I/O操作介面都封裝了一個帶CompletionHandler型別引數的過載方法,使用CompletionHandler可以很方便地處理AIO中的非同步I/O操作結果。CompletionHandler是一個具有兩個泛型型別引數的介面,聲明瞭兩個介面方法:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
其中,泛型V表示I/O操作的結果型別,通過該型別引數消費I/O操作的結果;泛型A為附加到I/O操作中的物件型別,可以通過該型別引數將需要的變數傳入到CompletionHandler實現中使用。因此,AIO中大部分的非同步I/O操作都有一個類似這樣的過載方法:
<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);
例如,AsynchronousServerSocketChannel.accept()方法:
public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
AsynchronousSocketChannel.write()方法等:
public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)
當I/O操作成功完成時,會回撥到completed方法,failed方法則在I/O操作失敗時被回撥。需要注意的是:在CompletionHandler的實現中應當即使處理操作結果,以避免一直佔用呼叫執行緒而不能分發其他的CompletionHandler處理器。
六、AIO程式碼實現
1、服務端
public class Server { private static int DEFAULT_PORT = 8888; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ Server.start(); } }
public class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsynchronousServerSocketChannel channel; public AsyncServerHandler(int port) { try { //建立服務端通道 channel = AsynchronousServerSocketChannel.open(); //繫結埠 channel.bind(new InetSocketAddress(port)); System.out.println("伺服器已啟動,埠號:" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch初始化 //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞 //此處,讓現場在此阻塞,防止服務端執行完成後退出 //也可以使用while(true)+sleep //生成環境就不需要擔心這個問題,以為服務端是不會退出的 latch = new CountDownLatch(1); //用於接收客戶端的連線 channel.accept(this,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
//作為handler接收客戶端連線 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { @Override public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { //繼續接受其他客戶端的請求 Server.clientCount++; System.out.println("連線的客戶端數:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); //建立新的Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //非同步讀 第三個引數為接收訊息回撥的業務Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } }
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //用於讀取半包訊息和傳送應答 private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //讀取到訊息後的處理 @Override public void completed(Integer result, ByteBuffer attachment) { //flip操作 attachment.flip(); //根據 byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("伺服器收到訊息: " + expression); String calrResult = null; try{ calrResult = Caculator.cal(expression).toString(); }catch(Exception e){ calrResult = "計算錯誤:" + e.getMessage(); } //向客戶端傳送訊息 doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //傳送訊息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //非同步寫資料 引數與前面的read一樣 channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果沒有傳送完,就繼續傳送直到完成 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //建立新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //非同步讀 第三個引數為接收訊息回撥的業務Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
2、客戶端
public class Client { private static String DEFAULT_HOST = "localhost"; private static int DEFAULT_PORT = 8888; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //向伺服器傳送訊息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("請輸入請求訊息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //建立非同步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //建立CountDownLatch等待 latch = new CountDownLatch(1); //發起非同步連線操作,回撥引數就是這個類本身,如果連線成功會回撥completed方法 clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //連線伺服器成功 //意味著TCP三次握手完成 @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("客戶端成功連線到伺服器..."); } //連線伺服器失敗 @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("連線伺服器失敗..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //向伺服器傳送訊息 public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //非同步寫 clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } }
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部資料的寫入 if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //讀取資料 ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("資料傳送失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客戶端收到結果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("資料讀取失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
3、測試類
public class Test { //測試主方法 @SuppressWarnings("resource") public static void main(String[] args) throws Exception { //執行伺服器 Server.start(); //避免客戶端先於伺服器啟動前執行程式碼 Thread.sleep(100); //執行客戶端 Client.start(); System.out.println("請輸入請求訊息:"); Scanner scanner = new Scanner(System.in); while (Client.sendMsg(scanner.nextLine())) ; } }
public final class Caculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException { return jse.eval(expression); } }