1. 程式人生 > >《Netty 權威指南》—— AIO 建立的TimeServer原始碼分析

《Netty 權威指南》—— AIO 建立的TimeServer原始碼分析

宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,

NIO2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供兩種方式獲取獲取操作結果:

  • 通過java.util.concurrent.Future類來表示非同步操作的結果;
  • 在執行非同步操作的時候傳入一個java.nio.channels.

CompletionHandler介面的實現類作為操作完成的回撥。

NIO2.0的非同步套接字通道是真正的非同步非阻塞IO,它對應Unix網路程式設計中的事件驅動IO(AIO),它不需要通過多路複用器(Selector)對註冊的通道進行輪詢操作即可實現非同步讀寫,簡化了NIO的程式設計模型。

下面還是通過程式碼來熟悉NIO2.0 AIO的相關類庫,我們仍舊以時間伺服器為例程進行講解。

AIO 建立的TimeServer原始碼分析

首先看下時間伺服器的主函式:

public class TimeServer {

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}

我們直接從第16行開始看,首先建立非同步的時間伺服器處理類,然後啟動執行緒將AsyncTimeServerHandler拉起,程式碼如下:

public class AsyncTimeServerHandler implements Runnable {

    private int port;

    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public AsyncTimeServerHandler(int port) {
	this.port = port;
	try {
	    asynchronousServerSocketChannel = AsynchronousServerSocketChannel
		    .open();
	    asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
	    System.out.println("The time server is start in port : " + port);
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {

	latch = new CountDownLatch(1);
	doAccept();
	try {
	    latch.await();
	} catch (InterruptedException e) {
	    e.printStackTrace();
	}
    }

    public void doAccept() {
	asynchronousServerSocketChannel.accept(this,
		new AcceptCompletionHandler());
    }

我們重點對AsyncTimeServerHandler進行分析,首先看8-15行,在構造方法中,我們首先建立一個非同步的服務端通道AsynchronousServerSocketChannel,然後呼叫它的bind方法繫結監聽埠,如果埠合法且沒被佔用,繫結成功,列印啟動成功提示到控制檯。
線上程的run方法中,第26行我們初始化CountDownLatch物件,它的作用是在完成一組正在執行的操作之前,允許當前的執行緒一直阻塞。在本例程中,我們讓執行緒在此阻塞,防止服務端執行完成退出。在實際專案應用中,不需要啟動獨立的執行緒來處理AsynchronousServerSocketChannel,這裡僅僅是個demo演示。
第24行用於接收客戶端的連線,由於是非同步操作,我們可以傳遞一個
CompletionHandler<AsynchronousSocketChannel,? super A>型別的handler例項接收accept操作成功的通知訊息,在本例程中我們通過AcceptCompletionHandler例項作為handler接收通知訊息,下面,我們繼續對AcceptCompletionHandler進行分析:

public class AcceptCompletionHandler implements
	CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {

    @Override
    public void completed(AsynchronousSocketChannel result,
	    AsyncTimeServerHandler attachment) {
	attachment.asynchronousServerSocketChannel.accept(attachment, this);
	ByteBuffer buffer = ByteBuffer.allocate(1024);
	result.read(buffer, buffer, new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
	exc.printStackTrace();
	attachment.latch.countDown();
    }
}

CompletionHandler有兩個方法,分別是:

1)  public void completed(AsynchronousSocketChannel result,

AsyncTimeServerHandler attachment);

2) public void failed(Throwable exc, AsyncTimeServerHandler attachment);

下面我們分別對這兩個介面的實現進行分析:首先看completed介面的實現,程式碼7-10行,我們從attachment獲取成員變數AsynchronousServerSocketChannel,然後繼續呼叫它的accept方法。可能讀者在此可能會心存疑惑,既然已經接收客戶端成功了,為什麼還要再次呼叫accept方法呢?原因是這樣的:當我們呼叫AsynchronousServerSocketChannel的accept方法後,如果有新的客戶端連線接入,系統將回調我們傳入的CompletionHandler例項的completed方法,表示新的客戶端已經接入成功,因為一個AsynchronousServerSocketChannel可以接收成千上萬個客戶端,所以我們需要繼續呼叫它的accept方法,接收其它的客戶端連線,最終形成一個迴圈。每當接收一個客戶讀連線成功之後,再非同步接收新的客戶端連線。
鏈路建立成功之後,服務端需要接收客戶端的請求訊息,程式碼第8行我們建立新的ByteBuffer,預分配1M的緩衝區。第8行我們通過呼叫AsynchronousSocketChannel的read方法進行非同步讀操作。下面我們看看非同步read方法的引數:
1)   ByteBuffer dst:接收緩衝區,用於從非同步Channel中讀取資料包;

2)  A attachment:非同步Channel攜帶的附件,通知回撥的時候作為入參使用;

3)  CompletionHandler<Integer,? super A>:接收通知回撥的業務handler,本例程中為ReadCompletionHandler。

下面我們繼續對ReadCompletionHandler進行分析:

public class ReadCompletionHandler implements
	CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel channel;

    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
	if (this.channel == null)
	    this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
	attachment.flip();
	byte[] body = new byte[attachment.remaining()];
	attachment.get(body);
	try {
	    String req = new String(body, "UTF-8");
	    System.out.println("The time server receive order : " + req);
	    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
		    System.currentTimeMillis()).toString() : "BAD ORDER";
	    doWrite(currentTime);
	} catch (UnsupportedEncodingException e) {
	    e.printStackTrace();
	}
    }

    private void doWrite(String currentTime) {
	if (currentTime != null && currentTime.trim().length() > 0) {
	    byte[] bytes = (currentTime).getBytes();
	    ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
	    writeBuffer.put(bytes);
	    writeBuffer.flip();
	    channel.write(writeBuffer, writeBuffer,
		    new CompletionHandler<Integer, ByteBuffer>() {
			@Override
			public void completed(Integer result, ByteBuffer buffer) {
			    // 如果沒有傳送完成,繼續傳送
			    if (buffer.hasRemaining())
				channel.write(buffer, buffer, this);
			}

			@Override
			public void failed(Throwable exc, ByteBuffer attachment) {
			    try {
				channel.close();
			    } catch (IOException e) {
				// ingnore on close
			    }
			}
		    });
	}
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
	try {
	    this.channel.close();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }
}

首先看構造方法,我們將AsynchronousSocketChannel通過引數傳遞到ReadCompletionHandler中當作成員變數來使用,主要用於讀取半包訊息和傳送應答。本例程不對半包讀寫進行具體解說,對此感興趣的可以關注後續章節對Netty半包處理的專題介紹。我們繼續看程式碼,第12-25行是讀取到訊息後的處理,首先對attachment進行flip操作,為後續從緩衝區讀取資料做準備。根據緩衝區的可讀位元組數建立byte陣列,然後通過new String方法建立請求訊息,對請求訊息進行判斷,如果是”QUERY TIME ORDER”則獲取當前系統伺服器的時間,呼叫doWrite方法傳送給客戶端。下面我們對doWrite方法進行詳細分析。

跳到程式碼第28行,首先對當前時間進行合法性校驗,如果合法,呼叫字串的解碼方法將應答訊息編碼成位元組陣列,然後將它拷貝到傳送緩衝區writeBuffer中,最後呼叫AsynchronousSocketChannel的非同步write方法。正如前面介紹的非同步read方法一樣,它也有三個與read方法相同的引數,在本例程中我們直接實現write方法的非同步回撥介面CompletionHandler,程式碼跳到第24行,對傳送的writeBuffer進行判斷,如果還有剩餘的位元組可寫,說明沒有傳送完成,需要繼續傳送,直到傳送成功。

最後,我們關注下failed方法,它的實現很簡單,就是當發生異常的時候,我們對異常Throwable進行判斷,如果是IO異常,就關閉鏈路,釋放資源,如果是其它異常,按照業務自己的邏輯進行處理。本例程作為簡單demo,沒有對異常進行分類判斷,只要發生了讀寫異常,就關閉鏈路,釋放資源。

非同步非阻塞IO版本的時間伺服器服務端已經介紹完畢,下面我們繼續看客戶端的實現。