1. 程式人生 > >《Netty 權威指南》—— AIO建立的TimeClient原始碼分析

《Netty 權威指南》—— AIO建立的TimeClient原始碼分析

宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,

非同步非阻塞IO版本的時間伺服器服務端已經介紹完畢,下面我們繼續看客戶端的實現。

首先看下客戶端主函式的實現,AIO時間伺服器客戶端  TimeClient:

public class TimeClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
	int port = 8080;
	if (args != null && args.length > 0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 採用預設值
	    }
	}
	new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
		"AIO-AsyncTimeClientHandler-001").start();

    }
}

第15行我們通過一個獨立的IO執行緒建立非同步時間伺服器客戶端handler,在實際專案中,我們不需要獨立的執行緒建立非同步連線物件,因為底層都是通過JDK的系統回撥實現的,在後面執行時間伺服器程式的時候,我們會抓取執行緒呼叫堆疊給大家展示。繼續看程式碼, AsyncTimeClientHandler的實現類原始碼如下:

public class AsyncTimeClientHandler implements
	CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
	this.host = host;
	this.port = port;
	try {
	    client = AsynchronousSocketChannel.open();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }

    @Override
    public void run() {
	latch = new CountDownLatch(1);
	client.connect(new InetSocketAddress(host, port), this, this);
	try {
	    latch.await();
	} catch (InterruptedException e1) {
	    e1.printStackTrace();
	}
	try {
	    client.close();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
	byte[] req = "QUERY TIME ORDER".getBytes();
	ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
	writeBuffer.put(req);
	writeBuffer.flip();
	client.write(writeBuffer, writeBuffer,
		new CompletionHandler<Integer, ByteBuffer>() {
		    @Override
		    public void completed(Integer result, ByteBuffer buffer) {
			if (buffer.hasRemaining()) {
			    client.write(buffer, buffer, this);
			} else {
			    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
			    client.read(
				    readBuffer,
				    readBuffer,
				    new CompletionHandler<Integer, ByteBuffer>() {
					@Override
					public void completed(Integer result,
						ByteBuffer buffer) {
					    buffer.flip();
					    byte[] bytes = new byte[buffer
						    .remaining()];
					    buffer.get(bytes);
					    String body;
					    try {
						body = new String(bytes,
							"UTF-8");
						System.out.println("Now is : "
							+ body);
						latch.countDown();
					    } catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					    }
					}

					@Override
					public void failed(Throwable exc,
						ByteBuffer attachment) {
					    try {
						client.close();
						latch.countDown();
					    } catch (IOException e) {
						// ingnore on close
					    }
					}
				    });
			}
		    }

		    @Override
		    public void failed(Throwable exc, ByteBuffer attachment) {
			try {
			    client.close();
			    latch.countDown();
			} catch (IOException e) {
			    // ingnore on close
			}
		    }
		});
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
	exc.printStackTrace();
	try {
	    client.close();
	    latch.countDown();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }
}

由於在AsyncTimeClientHandler中大量使用了內部匿名類,所以程式碼看起來稍微有些複雜,下面我們就對主要程式碼進行詳細解說。

9-17行是構造方法,首先通過AsynchronousSocketChannel的open方法建立一個新的AsynchronousSocketChannel物件。然後跳到第36行,建立CountDownLatch進行等待,防止非同步操作沒有執行完成執行緒就退出。第37行通過connect方法發起非同步操作,它有兩個引數,分別如下:

1)      A attachment : AsynchronousSocketChannel的附件,用於回撥通知時作為入參被傳遞,呼叫者可以自定義;

2)      CompletionHandler<Void,? super A> handler:非同步操作回撥通知介面,由呼叫者實現。

在本例程中,我們的兩個引數都使用AsyncTimeClientHandler類本身,因為它實現了CompletionHandler介面。

接下來我們看非同步連線成功之後的方法回撥completed方法,程式碼第39行,我們建立請求訊息體,對其進行編碼,然後拷貝到傳送緩衝區writeBuffer中,呼叫AsynchronousSocketChannel的write方法進行非同步寫,與服務端類似,我們可以實現CompletionHandler<Integer, ByteBuffer>介面用於寫操作完成後的回撥,程式碼第45-47行,如果傳送緩衝區中仍有尚未傳送的位元組,我們繼續非同步傳送,如果已經發送完成,則執行非同步讀取操作。

程式碼第64-97行是客戶端非同步讀取時間伺服器服務端應答訊息的處理邏輯,程式碼第49行我們呼叫AsynchronousSocketChannel的read方法非同步讀取服務端的響應訊息,由於read操作是非同步的,所以我們通過內部匿名類實現CompletionHandler<Integer, ByteBuffer>介面,當讀取完成被JDK回撥時,我們構造應答訊息。第56-63行我們從CompletionHandler的ByteBuffer中讀取應答訊息,然後列印結果。

第197-96行,當讀取發生異常時,我們關閉鏈路,同時呼叫CountDownLatch的countDown方法讓AsyncTimeClientHandler執行緒執行完畢,客戶端退出執行。

需要指出的是,正如之前的NIO例程,我們並沒有完整的處理網路的半包讀寫,當對例程進行功能測試的時候沒有問題,但是,如果對程式碼稍加改造,進行壓力或者效能測試,就會發現輸出結果存在問題。

由於半包的讀寫會作為專門的小節在Netty的應用和原始碼分析章節進行詳細講解,在NIO的入門章節我們就不詳細展開介紹,以便讀者能夠將注意力集中在NIO的入門知識上來。

下面的小節我們執行AIO版本的時間伺服器程式,並通過列印執行緒堆疊的方式看下JDK回撥非同步Channel CompletionHandler的呼叫情況。