1. 程式人生 > 其它 >IO模型之AIO程式碼及其實踐詳解

IO模型之AIO程式碼及其實踐詳解

一、AIO簡介

  AIO是java中IO模型的一種,作為NIO的改進和增強隨JDK1.7版本更新被整合在JDK的nio包中,因此AIO也被稱作是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在於JDK中,NIO於JDK1.4版本釋出更新)的阻塞式讀寫,AIO提供了從建立連線到讀、寫的全非同步操作。AIO可用於非同步的檔案讀寫和網路通訊。

二、同步/非同步、阻塞/非阻塞

  我們先來了解下什麼是同步/非同步,以及什麼是阻塞/非阻塞。在IO操作中,IO分兩階段(一旦拿到資料後就變成了資料操作,不再是IO):

  1. 資料準備階段
  2. 核心空間複製資料到使用者程序緩衝區(使用者空間)階段 在作業系統中,程式執行的空間分為核心空間和使用者空間。 應用程式都是執行在使用者空間的,所以它們能操作的資料也都在使用者空間。
  • 同步和非同步IO的概念:同步是使用者執行緒發起I/O請求後需要等待或者輪詢核心I/O操作完成後才能繼續執行 非同步是使用者執行緒發起I/O請求後仍需要繼續執行,當核心I/O操作完成後會通知使用者執行緒,或者呼叫使用者執行緒註冊的回撥函式。
  • 阻塞和非阻塞IO的概念: 阻塞是指I/O操作需要徹底完成後才能返回使用者空間 非阻塞是指I/O操作被呼叫後立即返回一個狀態值,無需等I/O操作徹底完成。

  一般來講: 阻塞IO模型、非阻塞IO模型、IO複用模型(select/poll/epoll)、訊號驅動IO模型都屬於同步IO,因為階段2是阻塞的(儘管時間很短)。同步IO和非同步IO的區別就在於第二個步驟是否阻塞: 如果不阻塞,而是作業系統幫你做完IO操作再將結果返回給你,那麼就是非同步IO。

三、非同步IO模型

            

  非同步IO則採用“訂閱-通知”模式:即應用程式向作業系統註冊IO監聽,然後繼續做自己的事情。當作業系統發生IO事件,並且準備好資料後,在主動通知應用程式,觸發相應的函式。也可以如下圖理解:

            

  和同步IO一樣,非同步IO也是由作業系統進行支援的。微軟的windows系統提供了一種非同步IO技術:IOCP(I/O CompletionPort,I/O完成埠);Linux下由於沒有這種非同步IO技術,所以使用的是epoll對非同步IO進行模擬。

四、JAVA AIO框架簡析

         

  JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路複用IO技術模擬非同步IO,這個從JAVA AIO框架的部分類設計上就可以看出來。例如框架中,在Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,在Linux下負責實現套接字通道的具體類是“sun.nio.ch.UnixAsynchronousServerSocketChannelImpl”,如下圖在Mac上安裝的JDK可以看到:

    

  另外特別說明一下,請注意在上圖中的“java.nio.channels.NetworkChannel”介面,這個介面同樣被JAVA NIO框架實現了,如上圖所示:SocketChannel以及ServerSocketChannel就是NetworkChannel的實現。

  AIO和同步IO(BIO和NIO)不同在於,IO操作全部委託給了被呼叫者(作業系統),在阻塞和非阻塞IO中,不管是使用阻塞流還是使用select選擇器,使用者程序下一步操作都是依賴作業系統的IO操作結果的,也就是需要同步的。而在AIO中,也就是前面通俗說的,(先寫好回撥操作)呼叫系統的IO操作

  在java中,支援非同步模型的方式有兩個類:

  • Future類
  • Callable介面
  嚴格來說,Future不能算是非同步模型的類,因為future.get()方法是阻塞的,需要等待處理完成;而Callable是回撥,是正宗的非同步模型工具。一般來說,非同步程式設計都是基於回撥的。

五、AIO重要類

  實現一個最簡單的AIO socket通訊server、client,主要需要這些相關的類和介面:

  • AsynchronousServerSocketChannel

    服務端Socket通道類,負責服務端Socket的建立和監聽;

  • AsynchronousSocketChannel

    客戶端Socket通道類,負責客戶端訊息讀寫;

  • CompletionHandler<A,V>

    訊息處理回撥介面,是一個負責消費非同步IO操作結果的訊息處理器;

  • ByteBuffer

    負責承載通訊過程中需要讀、寫的訊息。

  此外,還有可選的用於非同步通道資源共享的AsynchronousChannelGroup類,接下來將一一介紹這些類的主要介面及使用。

  1、AsynchronousServerSocketChannel

  AsynchronousServerSocketChannel是一個流式監聽套接字的非同步通道,是ServerSocketChannel的非同步版本的通道,支援非同步處理。AsynchronousServerSocketChannel的使用和ServerSocketChannel一樣需要經過三個步驟:建立/開啟通道、繫結地址和埠和監聽客戶端連線請求。

  1.1 建立/開啟通道

  簡單地,可以通過呼叫AsynchronousServerSocketChannel的靜態方法open()來建立AsynchronousServerSocketChannel例項:
try {
    AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
} catch (IOException e) {
    e.printStackTrace();
}

  當開啟通道失敗時,會丟擲一個IOException異常。

  1.2 繫結地址和埠

  通過呼叫AsynchronousServerSocketChannel.bind(SocketAddress)方法來繫結監聽地址和埠:

// 構建一個InetSocketAddress例項以指定監聽的地址和埠,如果需要指定ip,則呼叫InetSocketAddress(ip,port)構造方法建立即可
serverSocketChannel.bind(new InetSocketAddress(port));

  1.3 監聽和接收客戶端連線請求

  監聽客戶端連線請求,主要通過呼叫AsynchronousServerSocketChannel.accept()方法完成。accept()有兩個過載方法:

public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>);
public abstract Future<AsynchronousSocketChannel> accept();

  這兩個過載方法的行為方式完全相同一種基於Future,一種基於回撥,事實上,AIO的很多非同步API都封裝了諸如此類的過載方法:提供CompletionHandle回撥引數或者返回一個Future<T>型別變數。用過Feture介面的都知道,可以呼叫Feture.get()方法阻塞等待呼叫結果。無論是哪種方式來獲取連線,最終的處理物件都是Socket,和ServerSocketChannel不同的是,這裡的socket是封裝在AsynchronousSocketChannel中的。

  基於Future實現:

public void AsynchronousServerSocketChannel() {
        try {
            AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
            channel.bind(new InetSocketAddress(8888));
            while (true) {
                Future<AsynchronousSocketChannel> conn = channel.accept();
                // 阻塞等待直到future有結果
                AsynchronousSocketChannel asyncSocketChannel = conn.get();
                // 非同步處理連線
                asyncHandle(asyncSocketChannel);
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

  基於回撥:

  public void AsynchronousServerSocketChannelCallback() {
        try {
            AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
            channel.bind(new InetSocketAddress(8888));
            channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {

                @Override
                public void completed(AsynchronousSocketChannel result, Void attachment) {
            // 接收到新的客戶端連線時呼叫,result就是和客戶端的連線對話,此時可以通過result和客戶端進行通訊 System.out.println("accept completed"); // 非同步處理連線 asyncHandle(result); // 繼續監聽accept channel.accept(null, this); } @Override public void failed(Throwable exc, Void attachment) {
            // accept失敗時回撥 System.out.println("accept failed"); } }); // 讓主執行緒保持存活 while (true) { System.in.read(); } } catch (IOException e) { e.printStackTrace(); } }

  需要注意的是,AsynchronousServerSocketChannel是執行緒安全的,但在任何時候同一時間內只能允許有一個accept操作。因此,必須得等待前一個accept操作完成之後才能啟動下一個accept:

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // 接收到新的客戶端連線,此時本次accept已經完成
            // 繼續監聽下一個客戶端連線到來
            serverSocketChannel.accept(serverSocketChannel,this);
            // result即和該客戶端的連線會話
            // 此時可以通過result與客戶端進行互動
          }
          ...
});

  此外,還可以通過以下方法獲取和設定AsynchronousServerSocketChannel的socket選項:

// 設定socket選項
serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設定
boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

  其中StandardSocketOptions類封裝了常用的socket設定選項。

  獲取本地地址:

InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();

  1.4AsynchronousChannelGroup非同步通道組

try {
    ExecutorService pool = Executors.newCachedThreadPool();
    AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
    AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
    e.printStackTrace();
}

  AsynchronousServerSocketChannel提供了設定通道分組(AsynchronousChannelGroup)的功能,以實現組內通道資源共享。可以呼叫open(AsynchronousChannelGroup)過載方法建立指定分組的通道,預設情況下,具有 open() 方法的通道屬於一個全域性通道組,可利用如下系統變數對其進行配置:

  • java.nio.channels.DefaultThreadPoolthreadFactory,其不採用預設設定,而是定義一個 java.util.concurrent.ThreadFactory
  • java.nio.channels.DefaultThreadPool.initialSize,指定執行緒池的初始規模

  java.nio.channels.AsynchronousChannelGroup 中的三個實用方法提供了建立新通道組的方法:

withCachedThreadPool()
withFixedThreadPool()
withThreadPool()

  這些方法或者對執行緒池進行定義,如 java.util.concurrent.ExecutorService,或者是 java.util.concurrent.ThreadFactory。例如,以下呼叫建立了具有執行緒池的新的通道組,該執行緒池包含 10 個執行緒,其中每個都構造為來自 Executors 類的執行緒工廠:

AsynchronousChannelGroup tenThreadGroup =
AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());

  三個非同步網路通道都具有 open() 方法的替代版本,它們採用給出的通道組而不是預設通道組。例如,當有非同步操作請求時,此呼叫告訴 channel 使用 tenThreadGroup 而不是預設通道組來獲取執行緒:

AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(tenThreadGroup);

  定義自己的通道組可更好地控制服務於操作的執行緒,並能提供關閉執行緒或者等待終止的機制。

  AsynchronousChannelGroup封裝了處理由繫結到組的非同步通道所觸發的I/O操作完成所需的機制。每個AsynchronousChannelGroup關聯了一個被用於提交處理I/O事件和分發消費在組內通道上執行的非同步操作結果的completion-handlers的執行緒池。除了處理I/O事件,該執行緒池還有可能處理其他一些用於支援完成非同步I/O操作的任務。從上面例子可以看到,通過指定AsynchronousChannelGroup的方式開啟AsynchronousServerSocketChannel,可以定製server channel執行的執行緒池。如果不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個預設的分組中。

  2、AsynchronousSocketChannel

  AsynchronousSocketChannel和NIO通道是SocketChannel功能相似。是一個流式連線套接字的非同步通道。

  AsynchronousSocketChannel表示服務端與客戶端之間的連線通道。客戶端可以通過呼叫AsynchronousSocketChannel靜態方法open()建立,而服務端則通過呼叫AsynchronousServerSocketChannel.accept()方法後由AIO內部在合適的時候建立。下面以客戶端實現為例,介紹AsynchronousSocketChannel。

  2.1 建立AsynchronousSocketChannel

  需要通過open()建立和開啟一個AsynchronousSocketChannel例項,再呼叫其connect()方法連線到服務端,接著才可以與服務端互動:

// 開啟一個socket通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞等待連線成功
socketChannel.connect(new InetSocketAddress(ip,port)).get();
// 連線成功,接下來可以進行read、write操作

  同AsynchronousServerSocketChannel,AsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用於指定通道分組和定製執行緒池。

  2.2 connect

  socketChannel.connect()也提供了CompletionHandler回撥和Future返回值兩個過載方法,上面例子使用帶Future返回值的過載,並呼叫get()方法阻塞等待連線建立完成。

// 基於回撥
public abstract <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler);
// 基於Future 呼叫get()方法阻塞等待連線建立完成
public abstract Future<Void> connect(SocketAddress remote);

  2.3 傳送訊息

  可以構建一個ByteBuffer物件並呼叫socketChannel.write(ByteBuffer)方法非同步傳送訊息,並通過CompletionHandler回撥接收處理髮送結果:

ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // 傳送完成,result:總共寫入的位元組數
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // 傳送失敗
  }
});

  2.4 讀取訊息

  構建一個指定接收長度的ByteBuffer用於接收資料,呼叫socketChannel.read()方法讀取訊息並通過CompletionHandler處理讀取結果:

ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // 讀取完成,result:實際讀取的位元組數。如果通道中沒有資料可讀則result=-1。
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // 讀取失敗
  }
});

  此外,AsynchronousSocketChannel也封裝了設定/獲取socket選項的方法:

// 設定socket選項
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設定
boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

  注意:讀寫操作,有多個過載的Future和回撥式的read和write方法:

    public abstract <A> void read(ByteBuffer dst,
                                  long timeout,
                                  TimeUnit unit,
                                  A attachment,
                                  CompletionHandler<Integer,? super A> handler);
    public final <A> void read(ByteBuffer dst,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler)
    public abstract Future<Integer> read(ByteBuffer dst);
    public abstract <A> void read(ByteBuffer[] dsts,
                                  int offset,
                                  int length,
                                  long timeout,
                                  TimeUnit unit,
                                  A attachment,
                                  CompletionHandler<Long,? super A> handler);
    // write
    public abstract <A> void write(ByteBuffer src,
                                   long timeout,
                                   TimeUnit unit,
                                   A attachment,
                                   CompletionHandler<Integer,? super A> handler);
    public final <A> void write(ByteBuffer src,
                                A attachment,
                                CompletionHandler<Integer,? super A> handler);
    public abstract Future<Integer> write(ByteBuffer src);
    public abstract <A> void write(ByteBuffer[] srcs,
                                   int offset,
                                   int length,
                                   long timeout,
                                   TimeUnit unit,
                                   A attachment,
                                   CompletionHandler<Long,? super A> handler);

  如下伺服器端示例,使用的是accept返回的channel:

    // 基於future 實際上是同步的讀取方式
    private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
        ByteBuffer dst = ByteBuffer.allocate(1024);
        // based on Future,
    // 實際上是同步處理的方式,為了不將處理變成阻塞式單連線的socket形式,使用子執行緒來獲取輸入流
        new Thread(() -> {
            while (asyncSocketChannel.isOpen()) {
                Future<Integer> readFuture = asyncSocketChannel.read(dst);
                try {
                    // 阻塞等待讀取結果
                    Integer readResult = readFuture.get();
                    if (readResult > 0) {

                        System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
                        dst.clear();

                    } else {
                        // doOtherthing
                    }
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
 
    // 基於回撥
    private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
        asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() {

                @Override
                public void completed(Integer result, Void attachment) {
                    if (result > 0) {
                        System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
                        dst.clear();
                    }
                    // 註冊回撥,繼續讀取輸入
                    asyncSocketChannel.read(dst, null, this);

                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    // TODO Auto-generated method stub

                }
            });
    }

  3、CompletionHandler

  CompletionHandler是一個用於消費非同步I/O操作結果的處理器。

  AIO中定義的非同步通道允許指定一個CompletionHandler處理器消費一個非同步操作的結果。從上文中也可以看到,AIO中大部分的非同步I/O操作介面都封裝了一個帶CompletionHandler型別引數的過載方法,使用CompletionHandler可以很方便地處理AIO中的非同步I/O操作結果。CompletionHandler是一個具有兩個泛型型別引數的介面,聲明瞭兩個介面方法:

public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

  其中,泛型V表示I/O操作的結果型別,通過該型別引數消費I/O操作的結果;泛型A為附加到I/O操作中的物件型別,可以通過該型別引數將需要的變數傳入到CompletionHandler實現中使用。因此,AIO中大部分的非同步I/O操作都有一個類似這樣的過載方法:

<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);

  例如,AsynchronousServerSocketChannel.accept()方法:

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

  AsynchronousSocketChannel.write()方法等:

public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)

  當I/O操作成功完成時,會回撥到completed方法,failed方法則在I/O操作失敗時被回撥。需要注意的是:在CompletionHandler的實現中應當即使處理操作結果,以避免一直佔用呼叫執行緒而不能分發其他的CompletionHandler處理器。

六、AIO程式碼實現

  1、服務端

public class Server {
    private static int DEFAULT_PORT = 8888;
    private static AsyncServerHandler serverHandle;
    public volatile static long clientCount = 0;
    public static void start(){
        start(DEFAULT_PORT);
    }
    public static synchronized void start(int port){
        if(serverHandle!=null)
            return;
        serverHandle = new AsyncServerHandler(port);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args){
        Server.start();
    }
}
public class AsyncServerHandler implements Runnable {
    public CountDownLatch latch;
    public AsynchronousServerSocketChannel channel;
    public AsyncServerHandler(int port) {
        try {
            //建立服務端通道
            channel = AsynchronousServerSocketChannel.open();
            //繫結埠
            channel.bind(new InetSocketAddress(port));
            System.out.println("伺服器已啟動,埠號:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        //CountDownLatch初始化
        //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
        //此處,讓現場在此阻塞,防止服務端執行完成後退出
        //也可以使用while(true)+sleep
        //生成環境就不需要擔心這個問題,以為服務端是不會退出的
        latch = new CountDownLatch(1);
        //用於接收客戶端的連線
        channel.accept(this,new AcceptHandler());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//作為handler接收客戶端連線
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
    @Override
    public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
        //繼續接受其他客戶端的請求
        Server.clientCount++;
        System.out.println("連線的客戶端數:" + Server.clientCount);
        serverHandler.channel.accept(serverHandler, this);
        //建立新的Buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //非同步讀  第三個引數為接收訊息回撥的業務Handler
        channel.read(buffer, buffer, new ReadHandler(channel));
    }
    @Override
    public void failed(Throwable exc, AsyncServerHandler serverHandler) {
        exc.printStackTrace();
        serverHandler.latch.countDown();
    }
}
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    //用於讀取半包訊息和傳送應答
    private AsynchronousSocketChannel channel;
    public ReadHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }
    //讀取到訊息後的處理
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        //flip操作
        attachment.flip();
        //根據
        byte[] message = new byte[attachment.remaining()];
        attachment.get(message);
        try {
            String expression = new String(message, "UTF-8");
            System.out.println("伺服器收到訊息: " + expression);
            String calrResult = null;
            try{
                calrResult = Caculator.cal(expression).toString();
            }catch(Exception e){
                calrResult = "計算錯誤:" + e.getMessage();
            }
            //向客戶端傳送訊息
            doWrite(calrResult);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    //傳送訊息
    private void doWrite(String result) {
        byte[] bytes = result.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        //非同步寫資料 引數與前面的read一樣
        channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                //如果沒有傳送完,就繼續傳送直到完成
                if (buffer.hasRemaining())
                    channel.write(buffer, buffer, this);
                else{
                    //建立新的Buffer
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    //非同步讀  第三個引數為接收訊息回撥的業務Handler
                    channel.read(readBuffer, readBuffer, new ReadHandler(channel));
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (IOException e) {
                }
            }
        });
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  2、客戶端

public class Client {
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 8888;
    private static AsyncClientHandler clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(clientHandle!=null)
            return;
        clientHandle = new AsyncClientHandler(ip,port);
        new Thread(clientHandle,"Client").start();
    }
    //向伺服器傳送訊息
    public static boolean sendMsg(String msg) throws Exception{
        if(msg.equals("q")) return false;
        clientHandle.sendMsg(msg);
        return true;
    }
    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception{
        Client.start();
        System.out.println("請輸入請求訊息:");
        Scanner scanner = new Scanner(System.in);
        while(Client.sendMsg(scanner.nextLine()));
    }
}
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;
    private CountDownLatch latch;
    public AsyncClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            //建立非同步的客戶端通道
            clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        //建立CountDownLatch等待
        latch = new CountDownLatch(1);
        //發起非同步連線操作,回撥引數就是這個類本身,如果連線成功會回撥completed方法
        clientChannel.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        try {
            clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //連線伺服器成功
    //意味著TCP三次握手完成
    @Override
    public void completed(Void result, AsyncClientHandler attachment) {
        System.out.println("客戶端成功連線到伺服器...");
    }
    //連線伺服器失敗
    @Override
    public void failed(Throwable exc, AsyncClientHandler attachment) {
        System.err.println("連線伺服器失敗...");
        exc.printStackTrace();
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //向伺服器傳送訊息
    public void sendMsg(String msg){
        byte[] req = msg.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        //非同步寫
        clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
    }
}
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public WriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        //完成全部資料的寫入
        if (buffer.hasRemaining()) {
            clientChannel.write(buffer, buffer, this);
        } else {
            //讀取資料
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch));
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.err.println("資料傳送失敗...");
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
        }
    }
}
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;
    public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }
    @Override
    public void completed(Integer result,ByteBuffer buffer) {
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        String body;
        try {
            body = new String(bytes,"UTF-8");
            System.out.println("客戶端收到結果:"+ body);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc,ByteBuffer attachment) {
        System.err.println("資料讀取失敗...");
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
        }
    }
}

  3、測試類

public class Test {
    //測試主方法
    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {
        //執行伺服器
        Server.start();
        //避免客戶端先於伺服器啟動前執行程式碼
        Thread.sleep(100);
        //執行客戶端
        Client.start();
        System.out.println("請輸入請求訊息:");
        Scanner scanner = new Scanner(System.in);
        while (Client.sendMsg(scanner.nextLine())) ;
    }
}
public final class Caculator {
    private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
    public static Object cal(String expression) throws ScriptException {
        return jse.eval(expression);
    }
}