Java網路程式設計——第十章 非阻塞I/O
阿新 • • 發佈:2019-01-10
使用非阻塞IO方式實現chargen收發協議
客戶端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
public class ChargenClient {
public static final int DEFAULT_PORT = 19;
public static final String HOST= "localhost";
public static void main(String[] args) {
try {
SocketAddress address = new InetSocketAddress(HOST, DEFAULT_PORT);
SocketChannel client = SocketChannel.open(address);
// 注意client的通道是阻塞模式,修改為非阻塞模式,則通過
//client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(74);
// 使用通道的方式將緩衝區的資料寫入標準輸出
// Channels.newChannel是構建一個寫入目標流的通道
WritableByteChannel out = Channels.newChannel(System.out);
while (client.read(buffer) != -1) {
// 迴繞緩衝區,該方法將緩衝區準備為資料傳出狀態,輸出通道從資料開頭而不是末尾開始
// 迴繞緩衝區保持緩衝區資料不變,只是準備寫入而不是讀取
buffer.flip();
out.write(buffer);
// 要重用現有緩衝區,再次讀取之前需要清空緩衝區;該方法不會改變緩衝區資料
// 只是簡單重置索引
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ChargenServer {
public static final int DEFAULT_PORT = 19;
// 伺服器端單執行緒處理多客戶端連線
public static void main(String[] args) {
int port = DEFAULT_PORT;
System.out.println("listening for connections on port: " + port);
// 可列印ASCII共95個,由於每列印一行需要左移一個字元,這裡生成92*2個只是為了列印方便
byte[] rotation = new byte[95 * 2];
for (byte i = ' '; i <= '~'; i++) {
rotation[i - ' '] = i;
rotation[i + 95 - ' '] = i;
}
ServerSocketChannel serverChannel;
// Selector只接受非阻塞通道
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
// 要繫結埠,需要先用socket方法獲取ServerSocket的對等端peer
// 然後使用bind繫結埠
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(DEFAULT_PORT);
ss.bind(address);
// 配置serverChannel為非阻塞模式,如果沒有連線則accpet方法立即返回null, 如果不設定,accept方法將一直阻塞直到有連線進入
serverChannel.configureBlocking(false);
// 使用選擇器迭代處理準備好的連線
selector = Selector.open();
// 向選擇器註冊對此通道的監視,對於chargen,只關心伺服器Socket是否準備好接收一個新連線
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
return;
}
while (true) {
try {
// 檢查是否有可操作的通道準備好接受IO操作
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
// 獲取就緒通道的的SelectionKey的集合
Set<SelectionKey> readyKey = selector.selectedKeys();
// 迭代處理所有的key
Iterator<SelectionKey> iterator = readyKey.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 已經處理的key需要從集合中刪除,標記該鍵已經處理
// 若下次該key還再次準備好,還將繼續出現在Set中
iterator.remove();
try {
// 測試該key是否已經準備好接收一個新的socket連線,即此時就緒的是伺服器通道,接收一個新的Socket通道,將其新增到選擇器
if (key.isAcceptable()) {
// 獲取該key建立的channel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 接受一個到此server通道的連線
SocketChannel client = server.accept();
System.out.println("accepted from " + client);
// 配置client的通道為非阻塞模式
client.configureBlocking(false);
SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(74);
buffer.put(rotation, 0, 72);
buffer.put((byte) '\r');
buffer.put((byte) '\n');
// 迴繞緩衝區
buffer.flip();
key2.attach(buffer);
} else if (key.isWritable()) {// 測試該key是否準備好寫操作,即此時就緒的是Socket通道,向緩衝區寫入資料
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (!buffer.hasRemaining()) {
// 用下一行重新填充緩衝區
buffer.rewind();
// 得到上一次首字元
int first = buffer.get();
// 準備改變緩衝區資料
buffer.rewind();
// 尋找rotation中新的首字元位置
int position = first - ' ' + 1;
// 將資料從rotation複製到緩衝區
buffer.put(rotation, position, 72);
// 在緩衝區末尾存放一個行分隔符
buffer.put((byte) '\r');
buffer.put((byte) '\n');
// 迴繞緩衝區,準備寫入
buffer.flip();
}
client.write(buffer);
}
// 沒有就緒通道,選擇器等待就緒通道
} catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException ex) {
}
}
}
}
}
}
緩衝區
在NIO中,所有IO都必須緩衝;同時新模型中不再直接操作輸入流、輸出流寫,而是從緩衝區讀寫資料,注意緩衝區底層可能是位元組陣列,也可能是其他實現
流與通道的區別
1、流基於位元組,按順序一個位元組一個位元組的傳送,處於效能考慮也可以傳送位元組陣列;通道基於塊,傳送緩衝區中的資料塊,在讀寫通道位元組之前,資料必須已經儲存在緩衝區中,且一次讀寫一個緩衝區的資料
2、通道和緩衝區支援同一物件的讀寫
緩衝區型別
Java基本資料型別中,除了boolean外,所有基本型別都有對應的緩衝區,如float對應FloatBuffer,其超類是Buffer,Buffer中的方法為通用方法
緩衝區4個關鍵引數,無論緩衝區型別,都有相同的方法來獲取和設定這些值
1、位置position,即緩衝區中將讀取或者寫入的下一個位置,該值從0開始,最大值為緩衝區大小
public final int position()
public final Buffer position(int newPosition)
2、容量capacity,緩衝區儲存資料最大數目,容量在建立時設定,且設定後不可改變
public final int capacity()
3、限度limit,緩衝區中可訪問資料的末尾位置,只要不改變限度,就無法讀寫超過這個位置的資料,即使緩衝區容量更大
public final int limit()
public final Buffer limit(int newLimit)
4、標記mark,緩衝區中客戶端指定的標記,使得緩衝區能夠記住一個位置並在之後將其返回,緩衝區標記在mark()呼叫之前是未定義的,呼叫時則設定mark=position,reset()方法一樣,設定mark=position,如果標記未定義時呼叫reset(),則丟擲InvalidMaekException,一些緩衝區函式會拋棄已經設定的標記,如rewind()、clear()、flip()總是拋棄標記,如果設定的值比當前的標記小,則拋棄標記
public final Buffer mark()
public final Buffer reset()
注意: 0<= mark<=position<=limit<= capacity
公共buffer中的其他方法
public final Bufferclear(),位置設定為0,限度設定為容量,“清空”緩衝區,注意clear()並沒有刪除資料,只是重置了位置引數,這些資料依然可以使用絕對的get方法或者再改變限度和位置進行讀取
public final Buffer rewind(),位置設定為0,但不改變限度,即允許重新讀取緩衝區
public final Buffer flip(),將限度設定為當前位置,位置設定為0,在希望排空剛剛填充的緩衝區時可以使用這個方法
public final int remaining() 返回position和limit之間的元素數目
public fianl boolean hasRemaining(),如果remaining()返回的數大於0,則返回true
建立緩衝區
緩衝區類的層次是基於繼承的,而非基於多型,在使用緩衝區前,首先需要確定的是需要的緩衝區型別,然後使用對應的工程方法建立特定的實現類;空的緩衝區使用allocate建立,常用於輸入;預填充緩衝區由wrap建立,常用於輸出
allocate方法只用於建立固定容量的空緩衝區,底層基於陣列實現,可以通過array()和arrayOffset()訪問該底層陣列(後備陣列),但需要注意的是,這實際上暴露了buffer的私有資料,對該陣列的任何更改都將反映到緩衝區中,因此在獲取陣列後,就不應該在寫緩衝區
allocateDirect方法(僅用於ByteBuffer),底層使用直接記憶體在網絡卡、核心記憶體等位置建立緩衝區,底層不是陣列,且使用array()將丟擲UnSupportedOperationException;需要注意的是,直接分配應用於持續時間較短的緩衝區,其實現細節依賴JVM,不是必須不應使用
wrap方法,用於包裝已有的陣列形成緩衝區,此時緩衝區包含了陣列(充當了後備陣列)的引用,類似的對此後備陣列的改變會反映到緩衝區,因此在陣列操作完成前,不應使用wrap
填充put與排空get
對於使用相對的put和get,都會影響position,put會使position增大,超出容量丟擲BufferOverflowException,get會使position減小,可能丟擲BufferUnderflowException
對於使用絕對的put和get,即使用index,此時不會影響position,超出範圍則丟擲IndexOutOfBoundsException
批量方法,即以資料塊的方式填充或排空緩衝區,該方法會影響position,對應的put和get都是從當前position填充或排空資料,put在緩衝區沒有足夠空間容納陣列內容則丟擲BufferOverflowException,get在緩衝區沒有足夠資料填充陣列則丟擲BufferUnderflowException
批量方法(一下為ByteBuffer的bulk method,對於IntBuffer,則引數型別為int[]等)
public ByteBuffer get(byte[] det, int offset, int length)
public ByteBuffer get(byte[] det)
public ByteBuffer put(byte[] det, int offset, int length)
public ByteBuffer put(byte[] det)
資料轉換(僅用於ByteBuffer ),提供了將簡單型別引數對應的位元組填充到位元組緩衝區的put方法和通過將適量的位元組轉換為簡單型別的get方法;該方法可以指定位元組順序是Big-endian還是little-endian
檢查和設定緩衝區位元組順序
if (buffer.order().equals(ByteOrder.BIG_ENDIAN)) {
buffer.order(ByteOrder.LITTLE_ENDIAN)
}
檢視緩衝區
如果SocketChannel的ByteBuffer中只包含一種特定的基本型別元素,此時可以建立檢視緩衝區view buffer,他可以從底層ByteBuffer中提取資料,需要注意的是,對檢視緩衝區的修改會反映到底層緩衝區,反之亦然,並且每個緩衝區都有自己獨立的position、mark、limit、capacity
如果以非阻塞模式,需要注意在讀寫上層緩衝區之前,將底層ByteBuffer的資料排空,同時非阻塞模式不能保證在排空資料後仍能以int等型別的邊界對其,即向非阻塞通道中寫入半個位元組是可能出現的,因此在使用非阻塞IO時,在想檢視緩衝區中放入更多資料前,需要確保檢查這個問題
public abstract ShortBuffer asShortBuffer()
public abstract CharBuffer asCharBuffer()
public abstract IntBuffer asIntBuffer()
public abstract LongBuffer asLongBuffer()
public abstract FloatBuffer asFloatBuffer()
public abstract DoubleBuffer asDoubleBuffer()
整型數字伺服器InetgenServer
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class IntgenServer {
public static final int PORT = 1919;
public static void main(String[] args) {
System.out.println("Listening for connections on port " + PORT);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
ByteBuffer output = ByteBuffer.allocate(4);
output.putInt(0);
output.flip();
key2.attach(output);
} else if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
if (!output.hasRemaining()) {
output.rewind();
int value = output.getInt();
output.clear();
output.putInt(value + 1);
output.flip();
}
client.write(output);
}
} catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException ex) {
}
}
}
}
}
}
整型數字客戶端IntgenClient
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.SocketChannel;
public class IntgenClient {
public static final int PORT = 1919;
public static final String host = "localhost";
public static void main(String[] args) {
try {
SocketAddress address = new InetSocketAddress(host, PORT);
SocketChannel client = SocketChannel.open(address);
ByteBuffer buffer = ByteBuffer.allocate(4);
IntBuffer view = buffer.asIntBuffer();
for (int expected = 0; ; expected++) {
client.read(buffer);
int actual = view.get();
// SocketChannel 只能讀寫ByteBuffer,無法讀寫其他型別緩衝區
// 因此每次迴圈需要清空ByteBuffer,否則緩衝區慢程式將終止(這裡使用的阻塞模式)
buffer.clear();
view.rewind();
if (actual != expected) {
System.out.println("Expected " + expected + "; was " + actual);
break;
}
System.out.println(actual);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
壓縮緩衝區,將緩衝區的剩餘資料移到緩衝區開頭,position設定到資料末尾,進而釋放空間從而可以寫入更多資料;壓縮主要用在非阻塞IO的複製(讀取一個通道,再把資料寫入另一個通道)過程中,先讀入,在寫出,然後壓縮,這樣就可以接收更多的資料,利用一個緩衝區就能完成比較隨機的交替讀寫
public abstract ShortBuffer compact()
public abstract CharBuffer compact()
public abstract IntBuffer compact()
public abstract LongBuffer compact()
public abstract ByteBuffer compact()
public abstract DoubleBuffer compact()
public abstract FloatBuffer compact()
Echo伺服器EchoServer
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class EchoServer {
public static final int PORT = 7;
public static void main(String[] args) {
System.out.println("Listening for connections on port " + PORT);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
Set<SelectionKey> ready = selector.selectedKeys();
Iterator<SelectionKey> iterator = ready.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey clientKey = client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
// 注意迴繞緩衝區!!
output.flip();
client.write(output);
output.compact();
}
} catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException ex) {
}
}
}
}
}
}
複製緩衝區,建立緩衝區副本,從而將相同的資訊傳送到多個通道;複製的緩衝區不是克隆,他們共享相同的資料;每個緩衝區有獨立的position、mark、capacity、limit;對於一個間接緩衝區,他們共享相同的後備陣列,對一個緩衝區的修改會影響另一個緩衝區,因此複製緩衝區主要用於讀取緩衝區,如通過多通道大致並行的傳輸相同的資料;
public abstract ShortBuffer duplicate()
public abstract CharBuffer duplicate()
public abstract IntBuffer duplicate()
public abstract LongBuffer duplicate()
public abstract ByteBuffer duplicate()
public abstract DoubleBuffer duplicate()
public abstract FloatBuffer duplicate()
// 服務端接收一個請求,服務端下發一個檔案
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Set;
public class NoBlockingSingleFileHTTPServer {
private ByteBuffer contentBuffer;
private int port = 80;
public NoBlockingSingleFileHTTPServer(ByteBuffer data, String encoding, String MIMEType, int port) {
this.port = port;
String header = "HTTP/1.0 200 OK\r\n"
+ "Server: NoBlockingSingleFileHTTPServer\r\n"
+ "Content-Length " + data.limit() + "\r\n"
+ "Content-type " + MIMEType + "\r\n"
+ "\r\n";
byte[] headerData = header.getBytes(Charset.forName("UTF-8"));
ByteBuffer buffer = ByteBuffer.allocate(data.limit() + headerData.length);
buffer.put(headerData);
buffer.put(data);
buffer.flip();
this.contentBuffer = buffer;
}
public void run() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(port));
// 下述三行程式碼和上一行程式碼效果相同
// ServerSocket ss = serverChannel.socket();
// SocketAddress add = new InetSocketAddress(port);
// ss.bind(add);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (buffer.hasRemaining()) {
client.write(buffer);
} else {
// 結束工作
client.close();
}
} else if (key.isReadable()) {
// 只需要讀取,不需要解析,故下面三行程式碼無實際作用
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(4096);
client.read(buffer);
// 通道切換回只寫模式
key.interestOps(SelectionKey.OP_WRITE);
// 每個通道都有自己的緩衝區,所有通道共享相同內容
key.attach(contentBuffer.duplicate());
}
} catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException ex) {
}
}
}
}
}
public static void main(String[] args) {
String file = "C:\\Users\\Administrator\\Desktop\\程式學習\\新建文字文件.txt";
String encoding = "UTF-8";
int port = 80;
try {
String contentType = URLConnection.getFileNameMap().getContentTypeFor(file);
System.out.println(contentType);
Path filePath = FileSystems.getDefault().getPath(file);
System.out.println(filePath);
byte[] data = Files.readAllBytes(filePath);
ByteBuffer input = ByteBuffer.wrap(data);
NoBlockingSingleFileHTTPServer server = new NoBlockingSingleFileHTTPServer(input, encoding, contentType, port);
server.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
分片緩衝區,複製緩衝區的變形,分片緩衝區也會建立一個新的緩衝區,與原緩衝區共享資料,不同之處在於分片的起始位置position=0的位置是原緩衝區的position,分片緩衝區的capacity不大於原緩衝區的limit;倒回分片只能回到原緩衝區的position處;分片緩衝區對於協議資料緩衝區很有用,可以讀出受,然後使用分片緩衝區,然後將對應的新緩衝區傳遞給對應的方法或類
public abstract ShortBuffer slice()
public abstract CharBuffer slice()
public abstract IntBuffer slice()
public abstract LongBuffer slice()
public abstract ByteBuffer slice()
public abstract DoubleBuffer slice()
public abstract FloatBuffer slice()
標記與重置,用於重讀之前的資料
public final Buffer mark() // 設定標記,如果position < 標記位置,則忽略標記
public final Buffer reset() // 如果之前沒有設定mark,則丟擲 InvalidMarkException
緩衝區類的Object方法
equals(),兩個緩衝區資料型別相同、緩衝區剩餘資料個數相同、相同相對位置的資料相同
hashCode(),hashCode的方法是根據equals設計的,即向緩衝區的新增、刪除元素均會改變hashCode,即hashCode是易變的
compareTo(),buffer類實現了Comparable方法,該比較按剩餘資料進行比較,對應元素相等則緩衝區相等;否則按第一個不相等資料返回結果;如果某個緩衝區沒有元素而另一個還有資料,則認為較小的緩衝區小
toString(),除了CharBuffer會返回剩餘的資料,其餘則返回position、limit、capacity
SocketChannel,用於讀寫TCP Socket,資料必須編碼到ByteBuffer,每個SocketChannel都與一個對等端peer相關聯
連線
SocketChannel沒有公共構造方法
public static SocketChannel open(SocketAddress address) throws IOExcsption,該方法為立即建立連線,為阻塞方法,在連線建立或者丟擲異常前不會返回
public static SocketChannel open() throws IOException,該方法不立即建立連線,建立一個未連線Socket,必須使用 connect() 方法建立連線
// 阻塞模式,connect方法會等待連線建立
SocketChannel channel = SocketChannel.open();
SocketAddress address = new InetSocketAddress(host, port);
channel.connect(address);
// 非阻塞模式,connect方法會立即返回
SocketChannel channel = SocketChannel.open();
SocketAddress address = new InetSocketAddress(host, port);
channel.configureBlocking(fase);
channel.connect(address);
注意非阻塞模式,在使用連線前必須帶呼叫
public abstract boolean finishConnect() throws IOException
對於阻塞通道,該方法立即返回 true;連線可用則返回 true;連線不可用則返回 false,連線無法建立則丟擲異常,可以通過呼叫下列方法測試連線建立情況
public abstract boolean isConnect() 連線開啟則返回true
public abstract boolean isConnectPending() 連線在建立過程中則返回true
讀取
讀取SokcetChannel,首選需要建立ByteBuffer,然後使用 public abstract int read(ByteBuffer dst) throws IOException
1、通道會用盡可能多的資料填充緩衝區,然後返回直接
2、遇到流末尾,使用剩餘位元組填充緩衝區,且在下一次呼叫read()返回-1
3、若通道是阻塞的,這個方法至少讀回一個位元組、或者返回-1、或者丟擲異常
4、如果通道是非阻塞的,該方法可能返回0
// 讀取緩衝區,直到緩衝區滿或者遇到流結尾
while (buffer.hasRemaining() && channel.read(buffer) != -1);
// 散佈scatter,從一個源填充多個緩衝區,只要緩衝區陣列的最後一個緩衝區還有空間,就可以繼續讀取
public final long read(ByteBuffer[] dsts) throws IOException
public final long read(ByteBuffer[] dsts, int offset, int length) throws IOException
ByteBuffer[] buffers = new ByteBuffer[2];
buffer[0] = ByteBuffer.allocate(1000);
buffer[1] = ByteBuffer.allocate(1000);
while (buffer[1].hasRemaining() && channel.read(buffers) != -1);
寫入
public abstract int write(ByteBuffer src) throws IOException,如果通道是非阻塞的,則write方法不保證寫入緩衝區的所有內容,但由於緩衝區是基於遊標的,通過反覆呼叫該方法,即可完全寫入資料;聚集Gather,和Scatter類似,Gather是將多個緩衝區的資料寫入一個Socket
public final long write(ByteBuffer[] srcs) throws IOException
public final long write(ByteBuffer[] srcs, int offset, int length) throws IOException
關閉緩衝區
1、public void close() throws IOException,關閉通道並釋放資源;已關閉的通道再次關閉沒有任何效果;試圖讀寫一個已關閉的通道將丟擲異常
2、public boolean isOpen(),檢查通道是否關閉
3、SokcetChannel實現了AutoCloseable
ServerSocketChannel
該類的唯一作用:接收一個連線,無法對ServerSocketChannel進行直接讀寫
ServerSocketChannel.open(),建立一個新的ServerSocketChannel物件,該方法並不是開啟一個Socket,僅僅是建立一個物件,需要再使用Socket()方法獲得對等端ServerSocket,進而對其進行配置;注意這裡使用的工廠方法,不同虛擬機器有著不同的實現,且是使用者不可配置的,相同虛擬機器中open()總是返回相同類的例項
接受連線
public abstract SocketChannel accept() throwws IOException
accept()方法可以工作在阻塞模式和非阻塞模式
阻塞模式,accept()等待入站連線,在建立連線前執行緒無法執行任何操作,該策略適用於需要立即響應每個請求的簡單伺服器,預設阻塞模式
非阻塞模式,沒有入站連線時則返回null,非阻塞模式適用於需要為每個連線完成大量工作的伺服器,非阻塞模式一般結合Selector
accept方法丟擲的IOException的幾個子類異常,用於指示更詳細的問題
ClosedChannelException,關閉後無法重新開啟一個ServerSocketChannel
AsynchronousCloseException,執行accept()時,另一個執行緒關閉了ServerSocketChannel
ClosedByInterruptException,一個阻塞ServerSocketChannel在等待時被另一個執行緒中斷
NotYetBoundException,呼叫了accept()方法之前沒有將ServerSocketChannel的對等端SocketChannel與地址繫結,是執行時異常,不是IOException
SecurityException,安全管理器拒絕應用程式請求的埠
Channels類,用於完成通道和基於IO的流、讀寫器類之間的相互轉換,SocketChannel實現了下述方法,ServerSocketChannel沒有實現
public static InputStream newInputStream(ReadableByteChannel ch)
public static OutputStream newOutputStream(WritableByteChannel ch)
public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)
public static Reader newReader(ReadableByteChannel ch, CharsetDeocder decoder, int minimumBufferCapacity)
public static Reader newReader(ReadableByteChannel ch, String encoding)
public static Writer newWriter(WritableByteChannel ch, String encoding)
// 伺服器處理SOAP請求,通過通道讀取http請求主題,通道轉換為流,在使用SAX解析XML,
SocketChannel channel = server.accept();
processHTTPHeader(channel)
XMLReader parser = XMLReaderFactory.creatXMLReader()
parse.setContentHandler(someContnetHandlerObject)
InputStream in = Channels.newInputStream(channel);
parser.parse(in);
Java 7 非同步通道
AschronousSocketChannel和AschronousServerSocketChannel,有著和SocketChannel與ServerSocketChannel類似的方法,但不是他們的子類,不同之處在於,對非同步通道的讀寫會立即返回,資料由Future或者CompletionHandler進一步處理,connect()和accept()也是非同步的,並返回Future
1、future,適用於以一種特定順序獲取結果
// 網路連線並行執行
SocketAddress address = new InetSocketAddress(host, port);
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> connected = client.connect(address);
ByteBuffer buffer = ByteBuffer.allocate(capacity);
// 等待連線完成
connected.get();
// 從連線處讀取
Future<Integer> future = client.read(buffer);
// 做其他工作
// 等待讀取完成
future.get();
// 迴繞並排空緩衝區
buffer.flip();
WritableByteChannel out = Channels.newChannel(System.out);
out.write(buffer);
2、CompletionHandler,適用於獨立處理每個網路連線,例如網路爬蟲;CompletionHandler介面宣告兩個方法,
成功呼叫public void completed(Object exc, Object attachment)
失敗呼叫public void failed(Throwable exc, Object attachment)
class LineHandler implements CompletionHandler<Integer, ByteBuffer> {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// TODO Auto-generated method stub
buffer.flip();
WritableByteChannel out = Channels.newChannel(System.out);
try {
out.write(buffer);
} catch (IOException e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
// TODO Auto-generated method stub
System.out.println(exc.getMessage());
}
}
AsynchronousSocketChannle和AsynchronousServerSocketChannel是執行緒安全的,但是讀寫操作時,相同的操作只能有一個執行緒進行,但是讀寫可以同時進行,多執行緒同時讀寫時將丟擲ReadPendingException或者WritePendingException
Java 7 Socket選項
SocketChannel、ServerSocketChannel、AsynchronousSocketChannel、AsynChronousServerSocektChannel、DatagramChannel實現了NetworkChannel介面,主要用於支援各種TCP選項,主要通過以下三個方法設定、獲取支援的選項
<T> T getOption(SocektOption<T> name) throws IOException
<T> NetworkChannel setOption(SoketOption<T> name, T value) throws IOException
Set<SocketOption<?>> supportedOptions()
其中SocketOption類時泛型類,型別引數T的選項為 Integer、Boolean、NetworkInterface,其選項對應的常量在StandardSocketOptions類中定義
import java.io.IOException;
import java.net.SocketOption;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.DatagramChannel;
import java.nio.channels.NetworkChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class OptionSupport {
public static void main(String[] args) throws IOException {
printOptions(SocketChannel.open());
//printOptions(ServerSocketChannel.open());
printOptions(AsynchronousSocketChannel.open());
printOptions(AsynchronousServerSocketChannel.open());
printOptions(DatagramChannel.open());
}
public static void printOptions(NetworkChannel channel) throws IOException {
System.out.println(channel.getClass().getSimpleName() + " supports:");
for (SocketOption<?> option : channel.supportedOptions()) {
System.out.println(option.name() + " : " + channel.getOption(option));
}
System.out.println();
channel.close();
}
}
就緒選擇
將不同的通道註冊到一個Selector物件,每個通道分配一個SelectionKey,程式通過詢問這個Selector物件,哪些通道已經準備就緒可以無阻塞的完成操作,可以通過請求Selector物件返回相應的鍵集合
Selector類
1、首選呼叫靜態工廠方法建立選擇器
public static Selector open() thrwos IOException
2、向選擇器增加SelectableChannel通道物件,ops是SelectionKey的命名常量,如果一個通道需要在同一個選擇器中關注對個操作,使用 | 符號;att 是鍵的附件,該物件用於儲存連線的狀態;一個通道可以註冊到多個選擇器
public final SelectionKey register(Selector sel, int ops)
public final SelectionKey register(Selector sel, int ops, Object att)
3、查詢選擇器,選擇就緒的通道
public abstract int selectNow() throws IOException // 非阻塞選擇的方法,若沒有準備好的連線,則立即返回
public abstract int select() throws IOException // 返回前等待,直到至少有一個註冊的通道準備好進行處理
public abstract int select(long timeout) throws IOException // 在返回 0 前至多等到 time out 毫秒,沒有通道準備好則不做任何操作
4、使用SelectedKeys()獲取就緒的通道,然後迭代處理每個準備好的鍵,並將該鍵remove()掉
public abstract Set<SelectionKey> selectedKeys()
5、不再需要選擇器時,需要將其關閉,釋放與選擇器相關的所有資源,取消向選擇器註冊的所有的鍵,中斷被該選擇器的某個方法中斷的執行緒
public abstract void close() throws IOException
Selectionk類,通道指標,並儲存一個儲存通道連線狀態的附件
1、迭代處理SelectionKey時,首先測試這些鍵能進行操作,共四種可能,物件通道關注的四個操作
public final boolean isAcceptable()
public final boolean isConnectable()
public final boolean isReadable()
public final boolean isWritable()
2、使用channel()獲得這個通道
public abstract SelectableChannel channel()
3、若在儲存狀態資訊的SelectionKey儲存了一個物件,使用attachment()方法獲取該物件
public final Object attachment()
4、結束使用連線,則應該撤銷該SelectionKey物件的註冊
public abstract void cancel()