JAVA NIO工作原理及程式碼示例
簡介:本文主要介紹了JAVA NIO中的Buffer, Channel, Selector的工作原理以及使用它們的若干注意事項,最後是利用它們實現伺服器和客戶端通訊的程式碼例項。
歡迎探討,如有錯誤敬請指正
1. ByteBuffer
1.1直接緩衝區和非直接緩衝區
下面是建立ByteBuffer物件的幾種方式
|
allocate |
|
allocateDirect |
|
wrap |
|
wrap |
allocate方式建立的ByteBuffer物件我們稱之為非直接緩衝區,這個ByteBuffer物件(和物件包含的緩衝陣列)都位於JVM的堆區。wrap方式和allocate方式建立的ByteBuffer沒有本質區別,都建立的是非直接緩衝區。
allocateDirect方法建立的ByteBuffer我們稱之為直接緩衝區,此時ByteBuffer物件本身在堆區,而緩衝陣列位於非堆區, ByteBuffer物件內部儲存了這個非堆緩衝陣列的地址。在非堆區的緩衝陣列可以通過JNI(內部還是系統呼叫)方式進行IO操作,JNI不受gc影響,機器碼執行速度也比較快,同時還避免了JVM堆區與作業系統核心緩衝區的資料拷貝,所以IO速度比非直接緩衝區快。然而allocateDirect方式建立ByteBuffer物件花費的時間和回收該物件花費的時間比較多,所以這個方法適用於建立那些需要重複使用的緩衝區物件。
1.2重要屬性和方法
ByteBuffer物件三個重要屬性 position, limit和capacity。其中capacity表示了緩衝區的總容量,始終保持不變,初始時候position 等於 0 , limit 等於 capacity
1) put:向緩衝區放入資料
|
put |
ByteBuffer |
put |
ByteBuffer |
put |
呼叫put方法前,limit應該等於capacity,如果不等於,幾乎可以肯定我們對緩衝區的操作有誤。在put方法中0到position-1的區域表示有效資料,position到limit之間區域表示空閒區域。put方法會從position的當前位置放入資料,每放入一個數據position增加1,當position等於limit(即空閒區域使用完)時還繼續放入資料就會丟擲
2)get:從緩衝區讀取資料
|
get |
ByteBuffer |
get |
ByteBuffer |
get |
在get方法中, 0到position-1的區域表示已讀資料,position到limit之間的區域表示未讀取的資料。每讀取一個數據position增加1,當position等於limit時繼續讀取資料就會丟擲BufferUnderflowException異常。
2)flip :將寫模式轉換成讀模式
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
3)clear:清空緩衝區,將讀模式轉換寫模式
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
4)compact:保留未讀取的資料,將讀模式轉換寫模式
public ByteBuffer compact() { int pos = position(); int lim = limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); unsafe.copyMemory(ix(pos), ix(0), (long)rem << 0); position(rem); limit(capacity()); discardMark(); return this; }
5)mark:儲存當前position的位置到mark變數
public final Buffer mark() { mark = position; return this; }
6)rest:將position置為mark變數中的值
public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
mark方法和rest方法聯合使用可實現從指定位置的重讀。
7)rewind:從頭開始重讀
public final Buffer rewind() { position = 0; mark = -1; return this; }
ByteBuffer物件使用時又很多需要注意的地方,自認為這個API設計的不是很友好。比如一定不能連續兩次呼叫flip和compact方法,flip方法呼叫以後不能再呼叫put方法,等等。要避免這些錯誤,只能在使用ByteBuffer前弄清楚當前緩衝區中0到position-1以及position到limit中資料表示的含義,這才是避免bug的根本辦法。
從上面的介紹中我們可以看出,ByteBuffer物件既可以讀,也可以寫。除非我們能保證在讀操作一次性使用完ByteBuffer物件中的所有資料,並且保證寫入ByteBuffer物件向中的內容全部寫入完成,否則同時用於讀寫的ByteBuffer物件會造成資料的混亂和錯誤。一般來說,我們都會建立兩個ByteBuffer物件向,一個用於接收資料,另一個用於傳送資料。
1.3其它方法
ByteBuffer是面向位元組的,為方便基本資料型別的讀取,ByteBuffer中還提供getInt,putInt,getFloat,putFloat等方法,這些方法方便我們在緩衝區存取單個基本資料型別。如果需要從基本資料型別陣列中寫入到ByteBuffer中,或者從ByteBuffer中讀取到基本資料型別的陣列中,那麼我們可以通過已建立好的ByteBuffer物件的asXxxBuffer方法建立基本資料型別的Buffer。
|
asCharBuffer |
|
asDoubleBuffer |
|
asFloatBuffer |
|
asIntBuffer |
|
asLongBuffer |
假設有如下程式碼
IntBuffer intBufferObj = byteBufferObj.asIntBuffer();
此時intBufferObj和byteBufferObj物件共享底層的陣列。但是比較坑爹的是兩個buffer的position,limit是獨立的,這樣極易產生bug,需要引起我們注意。
1.4 ByteBuffer的編碼和解碼
資料傳輸中我們使用的是ByteBuffer物件作為緩衝區,如果在通道兩端我們通訊的內容是文字資料,這就涉及到ByteBuffer與CharBuffer的轉換。我們可以使用Charset類實現這個轉換的功能。
1)解碼示例
ByteBuffer byteBuffer = ByteBuffer.allocate(128); byteBuffer.put(new byte[]{-26, -120, -111, -25, -120, -79, -28, -67, -96}); byteBuffer.flip(); /*對獲取utf8的編解碼器*/ Charset utf8 = Charset.forName("UTF-8"); CharBuffer charBuffer = utf8.decode(byteBuffer);/*對bytebuffer中的內容解碼*/ /*array()返回的就是內部的陣列引用,編碼以後的有效長度是0~limit*/ char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit()); System.out.println(charArr); /*執行結果:我愛你*/
2)編碼示例
CharBuffer charBuffer = CharBuffer.allocate(128); charBuffer.append("我愛你"); charBuffer.flip(); /*對獲取utf8的編解碼器*/ Charset utf8 = Charset.forName("UTF-8"); ByteBuffer byteBuffer = utf8.encode(charBuffer); /*對charbuffer中的內容解碼*/ /*array()返回的就是內部的陣列引用,編碼以後的有效長度是0~limit*/ byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit()); System.out.println(Arrays.toString(bytes)); /*執行結果:[-26, -120, -111, -25, -120, -79, -28, -67, -96] */
我們還可以通過程式碼中的utf8編解碼器分別獲取編碼器物件和解碼器物件
CharsetEncoder utf8Encoder = utf8.newEncoder(); CharsetDecoder utf8Decoder = utf8.newDecoder();
然後通過下面編碼器和解碼器提供的方法進行編解碼,其中一些方法可以使ByteBuffer和CharBuffer物件迴圈使用,不必每次都產生一個新的物件。
解碼器方法
CharBuffer |
decode Convenience method that decodes the remaining content of a single input byte buffer into a newly-allocated character buffer. |
CoderResult |
decode Decodes as many bytes as possible from the given input buffer, writing the results to the given output buffer. |
|
decodeLoop Decodes one or more bytes into one or more characters. |
編碼器方法
|
encode Convenience method that encodes the remaining content of a single input character buffer into a newly-allocated byte buffer. |
|
encode Encodes as many characters as possible from the given input buffer, writing the results to the given output buffer. |
|
encodeLoop Encodes one or more characters into one or more bytes. |
注意encode和decode方法都會改變源buffer中的position的位置,這點也是容易產生bug的方法。
2. Channel
針對四種不同的應用場景,有四種不同型別的Channel物件。
型別 |
應用場景 |
是否阻塞 |
FileChannel |
檔案 |
阻塞 |
DatagramChannel |
UDP協議 |
阻塞或非阻塞 |
SocketChannel |
TCP協議 |
阻塞或非阻塞 |
ServerSocketChannel |
用於TCP伺服器端的監聽和連結 |
阻塞或非阻塞 |
Channel物件的建立都是通過呼叫內部的open靜態方法實現的,此方法是執行緒安全的。不論哪種型別的Channel物件,都有read(要理解為從通道中讀取,寫入緩衝區中)和write(要理解為從緩衝區中讀取資料,寫入到通道中)方法,而且read和write方法都只針對ByteBuffer物件。
當我們要獲取由通道傳輸過來的資料時,先呼叫channel.read(byteBufferObj)方法,這個方法在內部呼叫了byteBufferObj物件的put方法,將通道中的資料寫入緩衝區中。當我們要獲取由通道傳輸來的資料時,呼叫byteBufferObj.flip(),然後呼叫byteBufferObj的get方法獲取通道傳過來的資料,最後呼叫clear或compact方法轉換成寫模式,為下次channel.read做準備。
當我們要向通道傳送資料時,先調channel.write(byteBufferObj)方法,這個方法內部呼叫了byteBufferObj的get方法獲取資料,然後將資料寫入通道中。當寫入完成後呼叫clear或compact方法轉換成寫模式,為下次channel.write寫入緩衝區取做準備。
2.1 FileChannel
在檔案通道中read和write方法都是阻塞的,對於read方法,除非遇到檔案結束,否則會把緩衝區的剩餘空間讀滿再返回。對於write方法,會一次性把緩衝區中的內容全部寫入到檔案中才會返回。
下面的程式碼展示了FileChannel的功能,首先向文字檔案中寫入utf8格式的中英文混合字元,然後再讀取出來。讀寫過程中都涉及到編解碼問題。
package nioDemo; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; package nioDemo; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.file.Path; import java.nio.file.Paths; public class FileChannelDemo { public static void main(String[] args){ /*建立檔案,向檔案中寫入資料*/ try { /*如果檔案不存在,建立該檔案,檔案字尾是不是文字檔案不重要*/ File file = new File("E:/noi_utf8.data"); if(!file.exists()){ file.createNewFile(); } /*根據檔案輸出流建立與這個檔案相關的通道*/ FileOutputStream fos = new FileOutputStream(file); FileChannel fc = fos.getChannel(); /*建立ByteBuffer物件, position = 0, limit = 64*/ ByteBuffer bb = ByteBuffer.allocate(64); /*向ByteBuffer中放入字串UTF-8的位元組, position = 17, limit = 64*/ bb.put("Hello,World 123 \n".getBytes("UTF-8")); /*flip方法 position = 0, limit = 17*/ bb.flip(); /*write方法使得ByteBuffer的position到 limit中的元素寫入通道中*/ fc.write(bb); /*clear方法使得position = 0, limit = 64*/ bb.clear(); /*下面的程式碼同理*/ bb.put("你好,世界 456".getBytes("UTF-8")); bb.flip(); fc.write(bb); bb.clear(); fos.close(); fc.close(); } catch (FileNotFoundException e) { } catch (IOException e) { System.out.println(e); } /*從剛才的檔案中讀取字元序列*/ try { /*通過Path物件建立檔案通道*/ Path path = Paths.get("E:/noi_utf8.data"); FileChannel fc = FileChannel.open(path); ByteBuffer bb = ByteBuffer.allocate((int) fc.size()+1); Charset utf8 = Charset.forName("UTF-8"); /*阻塞模式,讀取完成才能返回*/ fc.read(bb); bb.flip(); CharBuffer cb = utf8.decode(bb); System.out.print(cb.toString()); bb.clear(); fc.close(); } catch (IOException e) { e.printStackTrace(); } } }
2.2 ServerSocketChannel
伺服器端用於建立TCP連線的通道,只能對accept事件感興趣。accept方法會返回一個已和客戶端連線好的SocketChannel通道,它才伺服器是真正傳輸資料的通道。
2.3 SocketChannel
TCP客戶端和TCP伺服器端都用它來傳輸資料。
客戶端必須呼叫connect方法去連線伺服器。在非阻塞通模式中,該方法將當前通道加入到選擇器的已註冊集合中,然後通過非同步方式進行建立TCP連線,然後該方法立刻返回。注意呼叫該方法後並不表示已經建立好了TCP連線,如果這個方法返回false,稍後必須呼叫finishConnect方法來完成客戶端到伺服器的tcp連線。在阻塞方式中,connect方法會阻塞直到建立好了TCP連線。
finishConnect在非阻塞模式中僅僅是返回連線的狀態。返回true時,表示連線建立好了。在阻塞模式下,直接呼叫方法connect即可完成連線,不需要使用finishConnect。
非阻塞模式下,讀寫操作要配合選擇器一起使用。在阻塞模式下,建立好TCP連線後就可以直接對通道進行讀寫操作。
2.4 DatagramChannel
connect方法僅用於客戶端到伺服器端的連線,連線的作用僅僅是避免每次傳送和接受資料時的安全檢查,提高發送和接受資料的效率,而不是像TCP連線那樣表示握手的意思。客戶端通道只有呼叫了connect方法後,才能使用read和write方法讀寫資料。
客戶端也可以不事先呼叫connet方法,而直接使用receive方法和send方法來實現資料的收發。
|
receive |
|
send |
2.5 伺服器端DatagramChannel和SocketChannel的區別
對於伺服器端DatagramChannel(UDP)和SocketChannel(TCP)有明顯的區別,對於TCP連線,伺服器端每建立一個連線就對應一個通道(不同的客戶端ip:port地址對應一個通道),而伺服器端UDP的連線始終只有一個通道,所有客戶端傳送過來的報文都存放於同一個緩衝區中,這顯然會降低伺服器端的效率,好在DatagramChannel物件是執行緒安全的,可以用多個執行緒讀寫同一個UDP通道。
伺服器端為什麼只有一個通道呢?我猜想因為UDP是無狀態的,不知道什麼時客戶端會發送資料,什麼時候資料又傳送完成,所以伺服器端沒有辦法為每個客戶端建立一個通道,就算伺服器端根據客戶端ip:port為每個客戶端建立了通道,伺服器端也不知道什麼時候該釋放這個通道,這就造成了資源的浪費。
4. Selector
Selector類表示選擇器,通過這個類的物件可以選取已就緒的通道和這個通道感興趣的事件。通過靜態open方法建立。
4.1註冊
通道可以通過它的register方法,將通道註冊到選擇器上。
SelectionKey |
register Registers this channel with the given selector, returning a selection key. |
|
register Registers this channel with the given selector, returning a selection key. |
這個該方法會返回一個SeletctKey物件,但在這裡我們通常忽略這個返回值。SeletctionKey物件內部包含了這個註冊的通道和這個通道感興趣的事件(ops引數),以及附帶的物件(由att引數傳遞),這個附帶的物件通常就是和這個通道相關的讀寫緩衝區。
4.2通道的選擇與取消
|
select Selects a set of keys whose corresponding channels are ready for I/O operations. |
|
select Selects a set of keys whose corresponding channels are ready for I/O operations. |
|
selectNow Selects a set of keys whose corresponding channels are ready for I/O operations. |
三個方法的返回值都表示就緒通道的數量。
select()方法是個阻塞方法,有通道就緒才會返回。
select(long timeout),最多阻塞timeout毫秒,即使沒有通道就緒也會返回,若超時返回,則當前執行緒中斷標誌位被設定。若阻塞時間內有通道就緒,就提前返回。
seletor.selectNow(),非阻塞方法。
一個seletor物件內部維護了三個集合。
1)已註冊集合:表示了所有已註冊通道的SelectionKey物件。
2)就緒集合:表示了所有已就緒通道的SelectionKey物件。
3)取消集合:表示了所有需要取消註冊關係的通道的SelectionKey物件。
SelectionKey的cancel方法用於取消通道和選擇器的註冊關係,這個方法只是把表示當前通道的SelectionKey放入取消集合中,下次呼叫select方法時才會真正取消註冊關係。
select方法每次會從已註冊的通道集合中刪除所有已取消的通道的SelectionKey,然後清空已取消的通道集合,最後從更新過的已註冊通道集合中選出就緒的通道,放入已就緒的集合中。每次呼叫select方法,會向已就緒的集合中放入已就緒通道的SelectionKey物件,呼叫selectedKeys 方法就會返回這個已就緒通道集合的引用。當我們處理完一個已就緒通道,該通道對應的SelectionKey物件仍然位於已就緒的集合中,這就要求我們處理一個已就緒的通道後就必須手動從已就緒的集合中刪除它,否則下次呼叫selectedKeys時,已處理過的通道還存在於這個集合中,導致執行緒空轉。這裡也是極易產生bug的。
4.3通道的寫方法注意事項
1)寫方法什麼時候就緒?
寫操作的就緒條件為socket底層寫緩衝區有空閒空間,此時並不代表我們這時有(或者需要將)資料寫入通道。而底層寫緩衝區絕大部分時間都是有空閒空間的,所以當你註冊寫事件後,寫操作基本一直是就緒的。這就導致只要有一個通道對寫事件感興趣,select方法幾乎總是立刻返回的,但是實際上我們可能沒有資料可寫的,所以使得呼叫select方法的執行緒總是空轉。對於客戶端傳送一些資料,客戶端返回一些資料的模型,我們可以在讀事件完成後,再設定通道對寫事件感興趣,寫操作完成後再取消該通道對寫事件的興趣,這樣就可以避免上述問題。
2)如何正確的傳送資料
while(writeBuffer.hasRemaining()){ channel.write(writeBuffer); }
上面傳送資料的通常用的程式碼,當網路狀況良好的情況下,這段程式碼能正常工作。 現在我們考慮一種極端情況,伺服器端寫事件就緒,我們向底層的寫緩衝區寫入一些資料後,伺服器端到客戶端的鏈路出現問題,伺服器端沒能把資料傳送出去,此時底層的寫緩衝區一直處於滿的狀態,假設writeBuffer中仍然還有沒傳送完的資料就會導致while迴圈空轉,浪費CPU資源,同時也妨礙這個selector管理的其它通道的讀寫。
為了解決個問題,我們應該使用下面的方法傳送資料
int len = 0; while(writeBuffer.hasRemaining()){ len = sc.write(writeBuffer); /*說明底層的socket寫緩衝已滿*/ if(len == 0){ break; } }
5. 程式碼示例
下面這個類,後面的程式碼都會用到,它只是兩個緩衝區的包裝
package nioDemo; import java.nio.ByteBuffer; /*自定義Buffer類中包含讀緩衝區和寫緩衝區,用於註冊通道時的附加物件*/ public class Buffers { ByteBuffer readBuffer; ByteBuffer writeBuffer; public Buffers(int readCapacity, int writeCapacity){ readBuffer = ByteBuffer.allocate(readCapacity); writeBuffer = ByteBuffer.allocate(writeCapacity); } public ByteBuffer getReadBuffer(){ return readBuffer; } public ByteBuffer gerWriteBuffer(){ return writeBuffer; } }
5.1 TCP非阻塞示例
1)伺服器端程式碼
package nioDemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Random; import java.util.Set; /*伺服器端,:接收客戶端傳送過來的資料並顯示, *伺服器把上接收到的資料加上"echo from service:"再發送回去*/ public class ServiceSocketChannelDemo { public static class TCPEchoServer implements Runnable{ /*伺服器地址*/ private InetSocketAddress localAddress; public TCPEchoServer(int port) throws IOException{ this.localAddress = new InetSocketAddress(port); } @Override public void run(){ Charset utf8 = Charset.forName("UTF-8"); ServerSocketChannel ssc = null; Selector selector = null; Random rnd = new Random(); try { /*建立選擇器*/ selector = Selector.open(); /*建立伺服器通道*/ ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); /*設定監聽伺服器的埠,設定最大連線緩衝數為100*/ ssc.bind(localAddress, 100); /*伺服器通道只能對tcp連結事件感興趣*/ ssc.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e1) { System.out.println("server start failed"); return; } System.out.println("server start with address : " + localAddress); /*伺服器執行緒被中斷後會退出*/ try{ while(!Thread.currentThread().isInterrupted()){ int n = selector.select(); if(n == 0){ continue; } Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); /*防止下次select方法返回已處理過的通道*/ it.remove(); /*若發現異常,說明客戶端連接出現問題,但伺服器要保持正常*/ try{ /*ssc通道只能對連結事件感興趣*/ if(key.isAcceptable()){ /*accept方法會返回一個普通通道, 每個通道在核心中都對應一個socket緩衝區*/ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); /*向選擇器註冊這個通道和普通通道感興趣的事件,同時提供這個新通道相關的緩衝區*/ int interestSet = SelectionKey.OP_READ; sc.register(selector, interestSet, new Buffers(256, 256)); System.out.println("accept from " + sc.getRemoteAddress()); } /*(普通)通道感興趣讀事件且有資料可讀*/ if(key.isReadable()){ /*通過SelectionKey獲取通道對應的緩衝區*/ Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); /*通過SelectionKey獲取對應的通道*/ SocketChannel sc = (SocketChannel) key.channel(); /*從底層socket讀緩衝區中讀入資料*/ sc.read(readBuffer); readBuffer.flip(); /*解碼顯示,客戶端傳送來的資訊*/ CharBuffer cb = utf8.decode(readBuffer); System.out.println(cb.array()); readBuffer.rewind(); /*準備好向客戶端傳送的資訊*/ /*先寫入"echo:",再寫入收到的資訊*/ writeBuffer.put("echo from service:".getBytes("UTF-8")); writeBuffer.put(readBuffer); readBuffer.clear(); /*設定通道寫事件*/ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } /*通道感興趣寫事件且底層緩衝區有空閒*/ if(key.isWritable()){ Buffers buffers = (Buffers)key.attachment(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); writeBuffer.flip(); SocketChannel sc = (SocketChannel) key.channel(); int len = 0; while(writeBuffer.hasRemaining()){ len = sc.write(writeBuffer); /*說明底層的socket寫緩衝已滿*/ if(len == 0){ break; } } writeBuffer.compact(); /*說明資料全部寫入到底層的socket寫緩衝區*/ if(len != 0){ /*取消通道的寫事件*/ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } } }catch(IOException e){ System.out.println("service encounter client error"); /*若客戶端連接出現異常,從Seletcor中移除這個key*/ key.cancel(); key.channel().close(); } } Thread.sleep(rnd.nextInt(500)); } }catch(InterruptedException e){ System.out.println("serverThread is interrupted"); } catch (IOException e1) { System.out.println("serverThread selecotr error"); }finally{ try{ selector.close(); }catch(IOException e){ System.out.println("selector close failed"); }finally{ System.out.println("server close"); } } } } public static void main(String[] args) throws InterruptedException, IOException{ Thread thread = new Thread(new TCPEchoServer(8080)); thread.start(); Thread.sleep(100000); /*結束伺服器執行緒*/ thread.interrupt(); } }
2)客戶端程式
package nioDemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Random; import java.util.Set; /*客戶端:客戶端每隔1~2秒自動向伺服器傳送資料,接收伺服器接收到資料並顯示*/ public class ClientSocketChannelDemo { public static class TCPEchoClient implements Runnable{ /*客戶端執行緒名*/ private String name; private Random rnd = new Random(); /*伺服器的ip地址+埠port*/ private InetSocketAddress remoteAddress; public TCPEchoClient(String name, InetSocketAddress remoteAddress){ this.name = name; this.remoteAddress = remoteAddress; } @Override public void run(){ /*建立解碼器*/ Charset utf8 = Charset.forName("UTF-8"); Selector selector; try { /*建立TCP通道*/ SocketChannel sc = SocketChannel.open(); /*設定通道為非阻塞*/ sc.configureBlocking(false); /*建立選擇器*/ selector = Selector.open(); /*註冊感興趣事件*/ int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; /*向選擇器註冊通道*/ sc.register(selector, interestSet, new Buffers(256, 256)); /*向伺服器發起連線,一個通道代表一條tcp連結*/ sc.connect(remoteAddress); /*等待三次握手完成*/ while(!sc.finishConnect()){ ; } System.out.println(name + " " + "finished connection"); } catch (IOException e) { System.out.println("client connect failed"); return; } /*與伺服器斷開或執行緒被中斷則結束執行緒*/ try{ int i = 1; while(!Thread.currentThread().isInterrupted()){ /*阻塞等待*/ selector.select(); /*Set中的每個key代表一個通道*/ Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); /*遍歷每個已就緒的通道,處理這個通道已就緒的事件*/ while(it.hasNext()){ SelectionKey key = it.next(); /*防止下次select方法返回已處理過的通道*/ it.remove(); /*通過SelectionKey獲取對應的通道*/ Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); /*通過SelectionKey獲取通道對應的緩衝區*/ SocketChannel sc = (SocketChannel) key.channel(); /*表示底層socket的讀緩衝區有資料可讀*/ if(key.isReadable()){ /*從socket的讀緩衝區讀取到程式定義的緩衝區中*/ sc.read(readBuffer); readBuffer.flip(); /*位元組到utf8解碼*/ CharBuffer cb = utf8.decode(readBuffer); /*顯示接收到由伺服器傳送的資訊*/ System.out.println(cb.array()); readBuffer.clear(); } /*socket的寫緩衝區可寫*/ if(key.isWritable()){ writeBuffer.put((name + " " + i).getBytes("UTF-8")); writeBuffer.flip(); /*將程式定義的緩衝區中的內容寫入到socket的寫緩衝區中*/ sc.write(writeBuffer); writeBuffer.clear(); i++; } } Thread.sleep(1000 + rnd.nextInt(1000)); } }catch(InterruptedException e){ System.out.println(name + " is interrupted"); }catch(IOException e){ System.out.println(name + " encounter a connect error"); }finally{ try { selector.close(); } catch (IOException e1) { System.out.println(name + " close selector failed"); }finally{ System.out.println(name + " closed"); } } } } public static void main(String[] args) throws InterruptedException{ InetSocketAddress remoteAddress = new InetSocketAddress("192.168.1.100", 8080); Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress)); Thread tb = new Thread(new TCPEchoClient("thread b", remoteAddress)); Thread tc = new Thread(new TCPEchoClient("thread c", remoteAddress)); Thread td = new Thread(new TCPEchoClient("thread d", remoteAddress)); ta.start(); tb.start(); tc.start(); Thread.sleep(5000); /*結束客戶端a*/ ta.interrupt(); /*開始客戶端d*/ td.start(); } }
5.2 UDP示例
客戶端非阻塞模式,伺服器端阻塞模式
1)伺服器端程式碼(伺服器端只有一個通道,對應一個讀緩衝區,一個寫緩衝區,所以使用非阻塞方式容易發生資料混亂)
package nioDemo; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.DatagramChannel; import java.nio.charset.Charset; public class ServiceDatagramChannelDemo { public static class UDPEchoService implements Runnable{ private int port; public UDPEchoService(int port){ this.port = port; } @Override public void run(){ ByteBuffer readBuffer = ByteBuffer.allocate(256); ByteBuffer writeBuffer = ByteBuffer.allocate(256); DatagramChannel dc = null; try{ /*伺服器端使用預設的阻塞IO的方式*/ dc = DatagramChannel.open(); dc.bind(new InetSocketAddress(port)); System.out.println("service start"); while(!Thread.currentThread().isInterrupted()){ try{ /*先讀取客戶端傳送的訊息,直到讀取到訊息才會返回*/ /*只能呼叫receive方法,因為不知道哪個地址給伺服器發信息,沒法實現呼叫connect方法*/ /*dc是阻塞的,所以receive方法要等到接收到資料才返回*/ SocketAddress clientAddress = dc.receive(readBuffer); readBuffer.flip(); CharBuffer charBuffer = Charset.defaultCharset().decode(readBuffer); System.out.println(charBuffer.array()); /*呼叫send方法向客戶端傳送的訊息, *dc是阻塞的,所以直到send方法把資料全部寫入到socket緩衝區才返回*/ writeBuffer.put("echo : ".getBytes()); readBuffer.rewind(); writeBuffer.put(readBuffer); writeBuffer.flip(); dc.send(writeBuffer, clientAddress); readBuffer.clear(); writeBuffer.clear(); }catch(IOException e){ System.out.println("receive from or send to client failed"); } } }catch(IOException e){ System.out.println("server error"); }finally{ try { if(dc != null){ dc.close(); } } catch (IOException e) { } } } } public static void main(String[] args) throws IOException{ new Thread(new UDPEchoService(8080)).start(); } }
2)客戶端程式碼
package nioDemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.charset.Charset; import java.util.Iterator; public class ClientDatagramChannelDemo { public static class UDPEchoClient implements Runnable{ private String name; private InetSocketAddress serviceAddress; public UDPEchoClient(String name, InetSocketAddress serviceAddress){ this.name = name; this.serviceAddress = serviceAddress; } @Override public void run(){ DatagramChannel dc = null; try{ /*每個實際上可以建立多個通道連線同一個伺服器地址, 我們這裡為了演示方便,只建立了一個通道*/ dc = DatagramChannel.open(); /*客戶端採用非阻塞模式*/ dc.configureBlocking(false); /*這裡的連線不是指TCP的握手連線,因為UDP協議本身不需要連線, *這裡連線的意思大概是提前向作業系統申請好本地埠號,以及高速作業系統要傳送的目的 *連線後的UDP通道可以提高發送的效率,還可以呼叫read和write方法接收和傳送資料 *未連線的UDP通道只能呼叫receive和send方法接收和傳送資料*/ dc.connect(serviceAddress); Selector selector = Selector.open(); int interest = SelectionKey.OP_READ | SelectionKey.OP_WRITE; dc.register(selector, interest, new Buffers(256, 256)); int i = 0; while(!Thread.currentThread().isInterrupted()){ selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = it.next(); it.remove(); Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); try{ if(key.isReadable()){ dc.read(readBuffer); readBuffer.flip(); CharBuffer charBuffer = Charset.defaultCharset().decode(readBuffer); System.out.println(charBuffer.array()); readBuffer.clear(); } if(key.isWritable()){ writeBuffer.put((name + (i++)).getBytes()); writeBuffer.flip(); dc.write(writeBuffer); writeBuffer.clear(); Thread.sleep(500); } }catch(IOException e){ key.cancel(); key.channel().close(); } } } }catch(InterruptedException e){ System.out.println(name + "interrupted"); } catch (IOException e) { System.out.println(name + "encounter connect error"); } finally{ try { dc.close(); } catch (IOException e) { System.out.println(name + "encounter close error"); }finally{ System.out.println(name + "closed"); } } } } public static void main(String[] args){ InetSocketAddress serviceAddress = new InetSocketAddress("192.168.1.100", 8080); UDPEchoClient clientA = new UDPEchoClient("thread a ", serviceAddress); UDPEchoClient clientB = new UDPEchoClient("thread b ", serviceAddress); UDPEchoClient clientC = new UDPEchoClient("thread c ", serviceAddress); new Thread(clientA).start(); new Thread(clientB).start(); new Thread(clientC).start(); } }
6. 參考內容
[3] JDK 8 API 文件