Java NIO-非阻塞通訊
NIO(Non-block IO)指非阻塞通訊,相對於其程式設計的複雜性,通常客戶端並不需要使用非阻塞通訊以提高效能,故這裡只有服務端使用非阻塞通訊方式實現
客戶端:
package com.test.client; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import org.apache.log4j.Logger; import com.test.util.SocketIO; public class Client { static Logger logger = Logger.getLogger(Client.class); private int port = 10000; private SocketChannel socketChannel; public Client(){ try { socketChannel = SocketChannel.open(); InetAddress host = InetAddress.getLocalHost(); InetSocketAddress addr = new InetSocketAddress(host, port); socketChannel.connect(addr); logger.debug("***"); logger.debug("client ip:"+socketChannel.socket().getLocalAddress()); logger.debug("client port:"+socketChannel.socket().getLocalPort()); logger.debug("server ip:"+socketChannel.socket().getInetAddress()); logger.debug("server port:"+socketChannel.socket().getPort()); logger.debug("***"); } catch (IOException e) { e.printStackTrace(); logger.error("Cilent socket establish failed!"); } logger.info("Client socket establish success!"); } public void request(String request){ try{ DataInputStream input = SocketIO.getInput(socketChannel.socket()); DataOutputStream output = SocketIO.getOutput(socketChannel.socket()); if(null != request && !request.equals("")){ byte[] bytes = request.getBytes("utf-8"); output.write(bytes); bytes = new byte[64]; int num = input.read(bytes); byte[] answer = new byte[num]; System.arraycopy(bytes, 0, answer, 0, num); if(num > 0){ logger.info("server answer:"+new String(answer,"utf-8")); }else{ logger.info("No server answer."); } } }catch(Exception e){ e.printStackTrace(); logger.error("client request error"); }finally{ if(null != socketChannel){ try{ socketChannel.close(); }catch(Exception e){ e.printStackTrace(); logger.error("socket close error"); } } } } public static void main(String[] args){ Client client1 = new Client(); //Client client2 = new Client(); client1.request("your name?"); //client2.request("your name?"); } }
服務端:
package com.test.server; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; import org.apache.log4j.Logger; public class Server { static Logger logger = Logger.getLogger(Server.class); private Selector selector; private ServerSocketChannel serverSocketChannel; private int queueNum = 10; private int bindPort = 10000; private int step = 0; private Charset charset = Charset.forName("utf-8"); private ByteBuffer buffer = ByteBuffer.allocate(64); public Server(){ try{ //為ServerSocketChannel監控接收連線就緒事件 //為SocketChannel監控連線就緒事件、讀就緒事件以及寫就緒事件 selector = Selector.open(); //作用相當於傳統通訊中的ServerSocket //支援阻塞模式和非阻塞模式 serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); //非阻塞模式 serverSocketChannel.configureBlocking(false); //serverSocketChannel.socket()會獲得一個和當前通道相關聯的socket serverSocketChannel.socket().bind(new InetSocketAddress(bindPort),queueNum); //註冊接收連線就緒事件 //註冊事件後會返回一個SelectionKey物件用以跟蹤註冊事件控制代碼 //該SelectionKey將會放入Selector的all-keys集合中,如果相應的事件觸發 //該SelectionKey將會放入Selector的selected-keys集合中 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }catch(Exception e){ e.printStackTrace(); logger.error("Server establish error!"); } logger.info("Server start up!"); } public void service() throws Exception{ //判斷是否有觸發事件 while(selector.select() > 0){ Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); //處理事件後將事件從Selector的selected-keys集合中刪除 iterator.remove(); try{ if(selectionKey.isAcceptable()){ this.Acceptable(selectionKey); }else if(selectionKey.isReadable()){ this.Readable(selectionKey); }else if(selectionKey.isWritable()){ this.Writable(selectionKey); } }catch(Exception e){ e.printStackTrace(); logger.error("event deal exception!"); } } } } private void Acceptable(SelectionKey selectionKey) throws Exception{ logger.info("accept:"+(++step)); ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel(); SocketChannel sc = (SocketChannel)ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); logger.info(selectionKey.hashCode()); } private void Readable(SelectionKey selectionKey) throws Exception{ logger.info("read:"+(++step)); SocketChannel sc = (SocketChannel)selectionKey.channel(); buffer.clear(); int num = sc.read(buffer); String request = ""; if(num > 0){ buffer.flip(); request = charset.decode(buffer).toString(); sc.register(selector, SelectionKey.OP_WRITE,request); }else{ sc.close(); } logger.info(selectionKey.hashCode()+":"+request); } private void Writable(SelectionKey selectionKey) throws Exception{ logger.info("write:"+(++step)); String request = (String)selectionKey.attachment(); SocketChannel sc = (SocketChannel)selectionKey.channel(); String answer = "not supported"; if(request.equals("your name?")){ answer = "server"; } logger.info(selectionKey.hashCode()+":"+answer); buffer.clear(); buffer.put(charset.encode(answer)); buffer.flip(); while(buffer.hasRemaining()) sc.write(buffer); sc.close(); } public static void main(String[] args) { Server server = new Server(); try{ server.service(); }catch(Exception e){ e.printStackTrace(); logger.error("Server run exception!"); } } }
IO工具類:
package com.test.util; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class SocketIO{ public static DataInputStream getInput(Socket socket) throws IOException{ //接收快取區大小,socket獲取輸入流之前設定 socket.setReceiveBufferSize(10); InputStream input = socket.getInputStream(); return new DataInputStream(input); } public static DataOutputStream getOutput(Socket socket) throws IOException{ //傳送快取區大小,socket獲取輸出流之前設定 socket.setSendBufferSize(10); OutputStream output = socket.getOutputStream(); return new DataOutputStream(output); } }
log4j日誌配置檔案:
log4j.rootLogger=debug,logOutput
log console out put
log4j.appender.logOutput=org.apache.log4j.ConsoleAppender
log4j.appender.logOutput.layout=org.apache.log4j.PatternLayout
log4j.appender.logOutput.layout.ConversionPattern=%p%d{[yy-MM-dd HH:mm:ss]}[%c] -> %m%n
server端的執行結果:
INFO[13-10-16 11:40:41][com.test.server.Server] -> Server start up!
INFO[13-10-16 11:40:53][com.test.server.Server] -> accept:1
INFO[13-10-16 11:41:14][com.test.server.Server] -> 20469344
INFO[13-10-16 11:41:21][com.test.server.Server] -> read:2
INFO[13-10-16 11:41:37][com.test.server.Server] -> 11688861:your name?
INFO[13-10-16 11:43:00][com.test.server.Server] -> write:3
INFO[13-10-16 11:43:00][com.test.server.Server] -> 11688861:server
可以看到readable方法中的SelectionKey和writable方法中的SelectionKey的雜湊碼是完全相同的,是同一個SelectionKey
SelectionKey是在SocketChannel類或ServerSocketChannel類註冊要監控的事件時產生的,這兩個類本身並沒有register方法,需要檢視它們共同父類AbstractSelectableChannel(只有關鍵程式碼):
public abstract class AbstractSelectableChannel
extends SelectableChannel{
......
// Keys that have been created by registering this channel with selectors.
// They are saved because if this channel is closed the keys must be
// deregistered. Protected by keyLock.
private SelectionKey[] keys = null;
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException{
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
synchronized (regLock) {
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
private SelectionKey findKey(Selector sel) {
synchronized (keyLock) {
if (keys == null)
return null;
for (int i = 0; i < keys.length; i++)
if ((keys[i] != null) && (keys[i].selector() == sel))
return keys[i];
return null;
}
}
void removeKey(SelectionKey k) { // package-private
synchronized (keyLock) {
for (int i = 0; i < keys.length; i++)
if (keys[i] == k) {
keys[i] = null;
keyCount--;
}
((AbstractSelectionKey)k).invalidate();
}
}
......
}
ServerSocketChannel和Socketchannel向Selector中註冊了特定事件,Selector就會監控這些事件是否發生。ServerSocketChannel和Socketchannel都為AbstractSelectableChannel類的子類,AbstractSelectableChannel類的register方法負責註冊事件,該方法會返回一個SelectionKey物件,該物件用於跟蹤被註冊事件
public abstract class SelectionKey {
protected SelectionKey() { }
public abstract SelectableChannel channel();
public abstract Selector selector();
......
}
一個Selector物件中包含了3種類型的鍵集(即SelectionKey集合,SelectionKey在以下部分被稱為“鍵”)
1,all-keys:所有註冊至該Selector的事件鍵集(selector.keys())
2,selected-keys:相關事件已經被Selector捕獲的鍵集(selector.selectedKeys())
3,cancel-keys:已被取消的鍵集(無法訪問該集合)
selected-keys和cancel-keys都為all-keys的子集,對於一個新建的Selector,這3個鍵集都為空
註冊事件時會將相應的SelectionKey加入Selector的all-keys鍵集中
取消SelectionKey或者關閉了SelectionKey相關聯的Channel,則會將相應的SelectionKey加入cancel-keys鍵集中
當執行選擇器的選擇操作時(selector.select(),對於選擇器來說,這個方法應該是相當重要的):
1,將cancel-keys中的每個SelectionKey從3個鍵集中移除(如果存在的話),並登出SelectionKey所關聯的Channel,cancel-keys鍵集變為空集。
2,如果監控的事件發生,Selector會將相關的SelectionKey加入selected-keys鍵集中
以下為對原始碼的分析、推測:
Selector作為選擇器,儲存了所有的Selectionkey(註冊的,取消的,觸發的),通過上面的AbstractSelectableChannel類的原始碼,發現Channel本身也儲存了一個自身關聯的SelectionKey陣列,這看起來是完全沒有必要的,但是仔細看一下register方法,能看出些許端倪:
Selector本身維護了3個集合,all-keys,selected-keys和cancel-keys,頻繁的註冊操作、取消註冊將會導致這3個集合頻繁的變化,伴隨頻繁變化的是頻繁的加鎖,這會嚴重的降低Selector的效能,畢竟一個Selector會被多個Channel作為選擇器使用,本身非阻塞的實現方式就是提高效能的一種解決方式
當註冊新的事件時,如果存在該通道相關的SelectionKey,則更新該SelectionKey所關注的事件以及其攜帶的附加資訊,如果不存在,則新增新的SelectionKey
這樣做的好處是,比起刪除以前的SelectionKey,新增新的SelectionKey,修改SelectionKey所關注的事件以及其攜帶的附加資訊顯然是更好的選擇,畢竟不需要修改Selector所維護的鍵集,當然也不需要頻繁加鎖(通過檢視Selector類的api,SelectionKey並不是thread-safe的,顯然並沒有加鎖,但是並沒有什麼問題),能夠提供更好的效能
總之,SelectionKey的雜湊碼會重複是很正常的,畢竟不是單純的註冊時新建、觸發後刪除方式,java實現時進行了優化