1. 程式人生 > >Java-RPC:2)NIO入門初窺

Java-RPC:2)NIO入門初窺

Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API,可以替代標準的Java IO API
  • Channels and Buffers(通道和緩衝區):標準的IO基於位元組流和字元流進行操作的,而NIO是基於通道(Channel)和緩衝區(Buffer)進行操作,資料總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中,Channel同時支援讀和寫。
  • Asynchronous IO(非同步IO):Java NIO可以讓你非同步的使用IO,例如:當執行緒從通道讀取資料到緩衝區時,執行緒還是可以進行其他事情。當資料被寫入到緩衝區時,執行緒可以繼續處理它。從緩衝區寫入通道也類似。
  • Selectors(選擇器):Java NIO引入了選擇器的概念,選擇器用於監聽多個通道的事件(比如:連線開啟,資料到達)。因此,單個的執行緒可以監聽多個數據通道。
下面看下網友通俗的比喻:

Channel對應以前的流,Buffer不是什麼新東西,Selector是因為nio可以使用非同步的非堵塞模式才加入的東西。以前的流總是堵塞的,一個執行緒只要對它進行操作,其它操作就會被堵塞,也就相當於水管沒有閥門,你伸手接水的時候,不管水到了沒有,你就都只能耗在接水(流)上。nio的Channel的加入,相當於增加了水龍頭(有閥門),雖然一個時刻也只能接一個水管的水,但依賴輪換策略,在水量不大的時候,各個水管裡流出來的水,都可以得到妥善接納,這個關鍵之處就是增加了一個接水工,也就是Selector,他負責協調,也就是看哪根水管有水了的話,在當前水管的水接到一定程度的時候,就切換一下:臨時關上當前水龍頭,試著開啟另一個水龍頭(看看有沒有水)。當其他人需要用水的時候,不是直接去接水,而是事前提了一個水桶給接水工,這個水桶就是Buffer。也就是,其他人雖然也可能要等,但不會在現場等,而是回家等,可以做其它事去,水接滿了,接水工會通知他們。這其實也是非常接近當前社會分工細化的現實,也是統分利用現有資源達到併發效果的一種很經濟的手段,而不是動不動就來個並行處理,雖然那樣是最簡單的,但也是最浪費資源的方式。

傳統NIO的示意圖:


1.selector向核心註冊了很多的channel,如果客戶端有請求,selector通過輪詢檢視key中的channel有活躍的東西,然後把活躍的key選擇出來,看是否可以進行連線。
2.若可以連線,則selector再向核心註冊一個accept事件,等待其連線成功,此時服務端繼續做其他的事。
3.待到到客戶端與channel真正連通後,selector繼續向核心註冊一個read事件,然後又繼續做自己之前的事。當資料真正發過來的時,再選擇出有資料的channel進行讀取。

以客戶端從伺服器端獲取網路時間為例

如下是伺服器的實現:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;

    /**
     * 初始化多路複用器、繫結監聽埠
     * @param port
     */
    public MultiplexerTimeServer(int port) {
	try {
	    selector = Selector.open();
	    servChannel = ServerSocketChannel.open();
	    servChannel.configureBlocking(false);
	    servChannel.socket().bind(new InetSocketAddress(port), 1024);
	    servChannel.register(selector, SelectionKey.OP_ACCEPT);
	    System.out.println("The time server is start in port : " + port);
	} catch (IOException e) {
	    e.printStackTrace();
	    System.exit(1);
	}
    }

    public void stop() {
	this.stop = true;
    }

 
    @Override
    public void run() {
	while (!stop) {
	    try {
		selector.select(1000);
		Set<SelectionKey> selectedKeys = selector.selectedKeys();
		Iterator<SelectionKey> it = selectedKeys.iterator();
		SelectionKey key = null;
		while (it.hasNext()) {
		    key = it.next();
		    it.remove();
		    try {
			handleInput(key);
		    } catch (Exception e) {
			if (key != null) {
			    key.cancel();
			    if (key.channel() != null)
				key.channel().close();
			}
		    }
		}
	    } catch (Throwable t) {
		t.printStackTrace();
	    }
	}

	// 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源
	if (selector != null)
	    try {
		selector.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
    }

    private void handleInput(SelectionKey key) throws IOException {

	if (key.isValid()) {
	    // 處理新接入的請求訊息
	    if (key.isAcceptable()) {
		// Accept the new connection
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		SocketChannel sc = ssc.accept();
		sc.configureBlocking(false);
		// Add the new connection to the selector
		sc.register(selector, SelectionKey.OP_READ);
	    }
	    if (key.isReadable()) {
		// Read the data
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		int readBytes = sc.read(readBuffer);
		if (readBytes > 0) {
		    readBuffer.flip();
		    byte[] bytes = new byte[readBuffer.remaining()];
		    readBuffer.get(bytes);
		    String body = new String(bytes, "UTF-8");
		    System.out.println("The time server receive order : "
			    + body);
		    String currentTime = "QUERY TIME ORDER"
			    .equalsIgnoreCase(body) ? new java.util.Date(
			    System.currentTimeMillis()).toString()
			    : "BAD ORDER";
		    doWrite(sc, currentTime);
		} else if (readBytes < 0) {
		    // 對端鏈路關閉
		    key.cancel();
		    sc.close();
		} else
		    ; // 讀到0位元組,忽略
	    }
	}
    }

    private void doWrite(SocketChannel channel, String response)
	    throws IOException {
	if (response != null && response.trim().length() > 0) {
	    byte[] bytes = response.getBytes();
	    ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
	    writeBuffer.put(bytes);
	    writeBuffer.flip();
	    channel.write(writeBuffer);
	}
    }
}
伺服器端的Main方法
public class TimeServer {
    public static void main(String[] args) throws IOException {
	int port = 8080;
	if (args != null && args.length<0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 採用預設值
	    }
	}
	MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
	new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

客服端的實現:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
	this.host = host == null ? "127.0.0.1" : host;
	this.port = port;
	try {
	    selector = Selector.open();
	    socketChannel = SocketChannel.open();
	    socketChannel.configureBlocking(false);
	} catch (IOException e) {
	    e.printStackTrace();
	    System.exit(1);
	}
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
	try {
	    doConnect();
	} catch (IOException e) {
	    e.printStackTrace();
	    System.exit(1);
	}
	while (!stop) {
	    try {
		selector.select(1000);
		Set<SelectionKey> selectedKeys = selector.selectedKeys();
		Iterator<SelectionKey> it = selectedKeys.iterator();
		SelectionKey key = null;
		while (it.hasNext()) {
		    key = it.next();
		    it.remove();
		    try {
			handleInput(key);
		    } catch (Exception e) {
			if (key != null) {
			    key.cancel();
			    if (key.channel() != null)
				key.channel().close();
			}
		    }
		}
	    } catch (Exception e) {
		e.printStackTrace();
		System.exit(1);
	    }
	}

	// 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源
	if (selector != null)
	    try {
		selector.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
    }

    
    private void doConnect() throws IOException {
	// 如果直接連線成功,則註冊到多路複用器上,傳送請求訊息,讀應答
	if (socketChannel.connect(new InetSocketAddress(host, port))) {
	    socketChannel.register(selector, SelectionKey.OP_READ);
	    doWrite(socketChannel);
	} else
	    socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }

    private void doWrite(SocketChannel sc) throws IOException {
	byte[] req = "QUERY TIME ORDER".getBytes();
	ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
	writeBuffer.put(req);
	writeBuffer.flip();
	sc.write(writeBuffer);
	if (!writeBuffer.hasRemaining())
	    System.out.println("Send order 2 server succeed.");
    }
    
    
    private void handleInput(SelectionKey key) throws IOException {

	if (key.isValid()) {
	    // 判斷是否連線成功
	    SocketChannel sc = (SocketChannel) key.channel();
	    if (key.isConnectable()) {
		if (sc.finishConnect()) {
		    sc.register(selector, SelectionKey.OP_READ);
		    doWrite(sc);
		} else
		    System.exit(1);// 連線失敗,程序退出
	    }
	    if (key.isReadable()) {
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		int readBytes = sc.read(readBuffer);
		if (readBytes > 0) {
		    readBuffer.flip();
		    byte[] bytes = new byte[readBuffer.remaining()];
		    readBuffer.get(bytes);
		    String body = new String(bytes, "UTF-8");
		    System.out.println("Now is : " + body);
		    this.stop = true;
		} else if (readBytes < 0) {
		    // 對端鏈路關閉
		    key.cancel();
		    sc.close();
		} else
		    ; // 讀到0位元組,忽略
	    }
	}
    }
}
客戶端的Main方法
public class TimeClient {

    public static void main(String[] args) {

	int port = 8080;
	if (args != null && args.length > 0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 採用預設值
	    }
	}
	new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
		.start();
    }
}