網路程式設計之BIO、NIO、AIO
TCP直連Socket與ServerSocket通訊
Server.java
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; public class Server { final static int PROT = 8765; public static void main(String[] args) { ServerSocket server = null; try { server = new ServerSocket(PROT); System.out.println(" server start .. "); //進行阻塞 Socket socket = server.accept(); //新建一個執行緒執行客戶端的任務 new Thread(new ServerHandler(socket)).start(); } catch (Exception e) { e.printStackTrace(); } finally { if(server != null){ try { server.close(); } catch (IOException e) { e.printStackTrace(); } } server = null; } } }
ServerHandler.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; public class ServerHandler implements Runnable{ private Socket socket ; public ServerHandler(Socket socket){ this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); /*Parameters: out An output stream autoFlush A boolean; if true, the println, printf, or format methods will flush the output buffer */ out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while(true){ body = in.readLine(); if(body == null) break; System.out.println("Server :" + body); out.println("伺服器端返回給客戶端的響應資料."); } } catch (Exception e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e) { e.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } socket = null; } } }
啟動Server
Client.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; public class Client { final static String ADDRESS = "127.0.0.1"; final static int PORT = 8765; public static void main(String[] args) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(ADDRESS, PORT); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); //向伺服器端傳送資料 out.println("客戶端傳送的的請求測試資料"); String response = in.readLine(); System.out.println("Client: " + response); } catch (Exception e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e) { e.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } socket = null; } } }
Eclispe的client、server輸出如下:
每次建立連線都要新啟動一個執行緒,而執行緒會佔用一定的資源。如果Client與Server建立的連線很多,就會建立很多的執行緒,ServerSocket所在的機器可能會出現資源逐步趨於耗盡的問題。
TCP建立連線三次握手:
第一次握手:建立連線時,客戶端傳送syn包(syn=j)到伺服器,並進入SYN_SENT狀態,等待伺服器確認;SYN:同步序列編號(Synchronize Sequence Numbers)。
第二次握手:伺服器收到syn包,必須確認客戶的SYN(ack=j+1),同時自己也傳送一個SYN包(syn=k),即SYN+ACK包,此時伺服器進入SYN_RECV狀態;
第三次握手:客戶端收到伺服器的SYN+ACK包,向伺服器傳送確認包ACK(ack=k+1),此包傳送完畢,客戶端和伺服器進入ESTABLISHED(TCP連線成功)狀態,完成三次握手。
三次握手完成後,客戶端與伺服器開始傳送資料。
網路程式設計的基本模型是Client/Server模型,即Client程序與Server程序直接進行相互通訊。伺服器端繫結某個埠並進行監聽,而客戶端通過指定IP、埠號向指定的Server發出連線請求,通過三次握手建立連線,若連線成功則客戶端與伺服器端即可進行相互通訊。
BIO同步阻塞
在JDK1.5之前,採用偽非同步的方式避免Server Socket建立過多的執行緒來處理客戶端的請求,其內部維護著一個執行緒池,將客戶端請求建立的Socket封裝成一個任務Task物件(任務Task類實現Runnable介面),把任務物件交給執行緒池處理,並配置相應的阻塞佇列BlockingQueue用於緩衝任務物件。線上程池中可以設定,用於處理Client建立連線Socket的執行緒池最大執行緒數,這樣就避免了Server Socket端無限制的建立子執行緒去處理每一個Client建立的連線而導致系統資源耗盡,機器宕機的問題。
Client.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT =8765;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("Client request");
String response = in.readLine();
System.out.println("Client:" + response);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
socket = null;
}
}
}
Server.java
在Server Socket端使用自定義執行緒池HandlerExecutorPool,引數50是執行緒池的最大執行緒數,100為ArrayBlockingQueue排隊等待的緩衝佇列長度。針對監聽並建立連線的Socket,經過自定義的ServerHandler包裝後,交給自定義執行緒池進行處理,Server Socket繼續處於accept狀態,監聽來自Client的連線請求。
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
final static int PORT = 8765;
public static void main(String[] args) {
ServerSocket server = null;
BufferedReader in = null;
PrintWriter out = null;
try {
server = new ServerSocket(PORT);
System.out.println("server start");
Socket socket = null;
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
while(true){
socket = server.accept();
executorPool.execute(new ServerHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(server != null){
try {
server.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
server = null;
}
}
}
HandlerExecutorPool.java
由於在Server Socket中傳遞的引數maxPoolSize=50, queueSize=100。建立的ThreadPoolExecutor物件初始化執行緒池時就建立的執行緒數為Runtime.getRuntime().availableProcessors()即JVM可用的處理器數,執行緒池的最大執行緒數為50,空閒時間為120秒,即執行緒池中的某個執行緒若空閒時間超過120秒仍未有新的任務分配給這個執行緒,則這個執行緒會停止,其佔用的資源會被回收。ArrayBlockingQueue是一個基於陣列的阻塞佇列,是一個有界佇列,其內部維護著一個定長陣列,以便緩衝佇列中資料物件,佇列的讀寫未實現分離,因此資料的生產和消費不能完全並行。由於queueSize=100,則該有界佇列的長度為100。
在下面程式碼中,使用的是ArrayBlockingQueue有界佇列,當有新的Socket交給執行緒池處理時,若執行緒池的實際執行緒數小於Runtime.getRuntime().availableProcessors()時,則優先建立執行緒;若當前執行緒數大於Runtime.getRuntime().availableProcessors()則將任務加入到ArrayBlockingQueue佇列中。在佇列已滿情況下,若線上程池的匯流排程數不大於50的前提下,建立新的執行緒處理當前這個新任務;若執行緒池的執行緒數已達到50個,則對新任務執行拒絕策略。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize){
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
ServerHandler.java
在上述Server.java中存在程式碼executorPool.execute(new ServerHandler(socket));,將經過ServerHandler包裝的Socket交給執行緒池中執行緒處理。ServerHandler實現了Runnable介面,在run()方法中獲取Client端傳遞給來的資料流,經過處理轉換後輸出,並使用out.println()方法給Client回傳Server Socket端的響應資訊。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler (Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while(true){
body = in.readLine();
if(body == null) break;
System.out.println("Server:" + body);
out.println("Server response");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
socket = null;
}
}
}
先啟動Server.java
再啟動Client.java,此時Server對應的console內容如下
Client對應的console內容如下
NIO同步非阻塞
NIO是非阻塞IO,在傳統TCP點對點直接連線的基礎上做了一層封裝,並不是Client與Server直接建立連線,而是Client先到Server端進行管道註冊。在Server端建立一個Selector多路複用器,啟動一個執行緒輪詢註冊到Selector上所有Channel的狀態,根據通道的狀態,執行相關操作。通道的狀態包括:Connect連線狀態、Accept阻塞狀態、Read可讀狀態、Write可寫狀態。NIO程式設計中有3個重要部分:Buffer緩衝區、Channel管道、Selector多路複用器
Buffer緩衝區
在NIO類庫中加入了Buffer物件,它包含一些需要寫入或讀取的資料。在面向流的IO中,可以將資料直接寫入或讀取到Stream物件中。在NIO庫中,所有資料的讀取與寫入都是用緩衝區處理的。緩衝區實際上是一個數組,這個陣列為緩衝區提供了資料的訪問讀寫等操作屬性,如位置、容量、上限等。通常為一個位元組陣列(ByteBuffer),也可以是其它java基本型別(Boolean除外)的陣列,如:ByteBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer。
NIO程式設計中,在get()、put(value)、put(array)之後,注意執行Buffer物件的flip()方法,將position復位為0
import java.nio.IntBuffer;
public class TestBuffer {
public static void main(String[] args) {
//建立指定長度的緩衝區
IntBuffer intBuffer = IntBuffer.allocate(10);
intBuffer.put(1);//pos=0值為1
intBuffer.put(2);//pos=1值為2
intBuffer.put(3);//pos=2值為3
intBuffer.put(4);//pos=3值為4
intBuffer.put(5);//pos=4值為5
System.out.println("intBuffer:" + intBuffer);
//位置pos復位為0
intBuffer.flip();
System.out.println("intBuffer執行flip()後:" + intBuffer);
System.out.println("pos:" + intBuffer.position());
System.out.println("lim:" + intBuffer.limit());//intBuffer中已放置元素的個數
System.out.println("cap:" + intBuffer.capacity());//intBuffer容量
//get(index)方法,pos不改變
System.out.println("intBuffer.get(3):" + intBuffer.get(3));
System.out.println("intBuffer執行get(3)後:" + intBuffer);
//put(index, change)方法,pos不改變
intBuffer.put(2, 33);
System.out.println("intBuffer執行put(2, 33)後:" + intBuffer);;
//get()方法,pos改變,pos值加1
for (int i = 0; i < intBuffer.limit(); i++) {
System.out.print(intBuffer.get() + "\t");
}
System.out.println();
System.out.println("intBuffer使用for迴圈遍歷之後: " + intBuffer);
System.out.println("pos:" + intBuffer.position());
System.out.println("lim:" + intBuffer.limit());//intBuffer中已放置元素的個數
System.out.println("cap:" + intBuffer.capacity());//intBuffer容量
//wrap包裹陣列
System.out.println("------wrap包裹陣列------");
int[] array = new int[]{6,7,8,9,10};
IntBuffer wrapIntBuffer = IntBuffer.wrap(array);
System.out.println("wrapIntBuffer:"+wrapIntBuffer);
for (int i = 0; i < wrapIntBuffer.limit(); i++) {
System.out.print(wrapIntBuffer.get() + "\t");
}
System.out.println();
System.out.println("wrapIntBuffer使用for迴圈遍歷之後: " + wrapIntBuffer);
//pos復位為0
wrapIntBuffer.flip();
//修改wrapIntBuffer下標2位置處的8為88
wrapIntBuffer.put(2,88);
System.out.println("pos:" + wrapIntBuffer.position());
System.out.println("lim:" + wrapIntBuffer.limit());//intBuffer中已放置元素的個數
System.out.println("cap:" + wrapIntBuffer.capacity());//intBuffer容量
System.out.print("wrapIntBuffer使用for迴圈遍歷:");
for (int i = 0; i < wrapIntBuffer.limit(); i++) {
System.out.print(wrapIntBuffer.get() + "\t");
}
System.out.println();
System.out.print("被wrap包裹的array內容發生了改變:");
for(int j=0;j<array.length;j++){
System.out.print(array[j]+"\t");
}
//複製方法
System.out.println();
System.out.println("------複製方法------");
IntBuffer intBufferOne = IntBuffer.allocate(10);
intBufferOne.put(array);//pos發生變化
System.out.println("intBufferOne:"+intBufferOne);
intBufferOne.flip();//pos復位
System.out.print("intBufferOne使用for迴圈遍歷:");
for (int i = 0; i < intBufferOne.limit(); i++) {
System.out.print(intBufferOne.get() + "\t");
}
//duplicate()複製
intBufferOne.flip();//pos復位
IntBuffer intBufferTwo = intBufferOne.duplicate();
System.out.println();
System.out.println("intBufferTwo:"+intBufferTwo);
System.out.println("可讀資料為:" + intBufferTwo.remaining());//limit - position
intBufferTwo.position(2);
System.out.println("intBufferTwo:"+intBufferTwo);
System.out.println("可讀資料為:" + intBufferTwo.remaining());//limit - position
}
}
Eclipse的console輸出如下:
Channel通道
網路資料通過Channel通道讀取和寫入,通道與流不同之處在於通道是雙向的,而流(InputStream或OutputStream的子類)只能在一個方向上移動。通道可以用於讀、寫或者兩者同時進行。Channel通道可以與多路複用器結合起來,有多種狀態位,方便多路複用器識別並執行相應操作。
Channel通道分為兩大類:一類是網路讀寫的SelectableChannel,一類是用於檔案操作的FileChannel。SocketChannel和ServerSocketChannel都是SelectableChannel的子類。
Selector多路複用器
它是NIO程式設計的基礎,提供選擇已經就緒任務的能力。當IO事件(管道)註冊到選擇器以後,Selector會分配給每個管道一個key值。Selector會不斷輪詢註冊在其上的通道Channel,如果某個通道發生了讀寫操作,這個通道就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以取得就緒的Channel集合,從而進行後續的IO操作,從管道中讀取或者寫入資料,寫到資料緩衝區Buffer中。一個多路複用器Selector可以負責成千上萬的Channel通道,JDK使用epoll代替了傳統的select實現,使得獲取連線控制代碼沒有限制。只需要一個執行緒負責Selector輪詢,就可以接入成千上萬的客戶端。
下面程式碼,在Server類的構造方法中,建立ServerSocketChannel物件,將該物件註冊到多路複用器Selector上,並處於阻塞accept狀態。由於Server類實現了Runnable介面,在run()方法中存在while(true)迴圈,在while迴圈體中不論客戶端Channel還是伺服器Channel,都在多路複用器的輪詢的範圍。在輪詢過程中,獲取所有註冊到多路複用器Selector上的key,在這個while(true)首次執行的時候,獲取到的處於阻塞狀態的Channel為伺服器Channel,這個伺服器端Channel執行accept()方法,監聽處於就緒狀態的客戶端Channel,將客戶端Channel通道註冊到多路複用器Selector上,並監聽其讀標示位。在存在客戶端Channel註冊到Selector的情況下,在while(true)迴圈體中,若客戶端key處於key.isReadable()為true時,就會執行read()方法。在read方法中,首先將緩衝區清空,獲取呼叫read()方法的客戶端Channel,讀取客戶端Channel中的資料到緩衝區Buffer。
綜合使用Buffer、Channel、Selector的Client端與Server端雙向通訊示例
Server.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server implements Runnable{
//多路複用器(管理所有的通道)
private Selector selector;
//建立讀緩衝區,快取空間大小1024
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//建立寫緩衝區,快取空間大小1024
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//開啟多路複用器
this.selector = Selector.open();
//開啟伺服器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//設定伺服器通道為非阻塞模式
ssc.configureBlocking(false);
//繫結監聽埠
ssc.bind(new InetSocketAddress(port));
//把伺服器通道註冊到多路複用器上,並且監聽阻塞事件
ssc.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//要讓多路複用器開始監聽
this.selector.select();
//返回多路複用器已經選擇的結果集
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
//進行遍歷
while(keys.hasNext()){
//獲取一個選擇的元素
SelectionKey key = keys.next();
//直接從容器中移除就可以了
keys.remove();
//如果是有效的
if(key.isValid()){
//如果為阻塞狀態
if(key.isAcceptable()){
this.accept(key);
}
//如果為可讀狀態
if(key.isReadable()){
this.read(key);
}
//如果為可 寫狀態
if(key.isWritable()){
this.write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*向SocketChannel中寫資料*/
private void write(SelectionKey key){
SocketChannel sc = (SocketChannel) key.channel();
try {
//定義一個位元組陣列
byte[] bytes = new byte[1024];
//使用系統錄入功能,等待使用者輸入資料並回車
System.in.read(bytes);
//把資料放到緩衝區中
writeBuf.put(bytes);
//對緩衝區進行復位
writeBuf.flip();
//寫出資料給Client端
sc.write(writeBuf);
//清空緩衝區資料
writeBuf.clear();
//因已經執行了向SocketChannel的寫操作,這裡向selector註冊sc通道的讀事件狀態
sc.register(this.selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//清空緩衝區舊的資料
this.readBuf.clear();
//獲取之前註冊的socket通道物件
SocketChannel sc = (SocketChannel) key.channel();
//讀取資料到緩衝區
int count = sc.read(this.readBuf);
//如果沒有資料
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//有資料則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)
/*Flips this buffer. The limit is set to the current position and then
the position is set to zero. If the mark is defined then it is discarded.*/
this.readBuf.flip();
//根據緩衝區的資料長度建立相應大小的byte陣列,接收緩衝區的資料
byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用資料個數
//接收緩衝區資料到位元組陣列
this.readBuf.get(bytes);
//列印結果
String body = new String(bytes).trim();
System.out.println("伺服器端接收到客戶端傳送的資訊 : " + body);
//因已經執行了向SocketChannel的讀操作,這裡向selector註冊sc通道的寫事件狀態
sc.register(this.selector,SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//服務通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//阻塞方法
SocketChannel sc = ssc.accept();
//阻塞模式
sc.configureBlocking(false);
//將客戶端通道註冊到多路複用器上,並設定讀取標識
sc.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();;
}
}
在Server.java中,因為ServerSocketChannel物件在Selector上僅僅註冊了SelectionKey.OP_ACCEPT事件狀態,因此Server端建立的一個執行緒,在輪詢Selector過程中,獲取處於就緒狀態的所有Channel通道的集合。Selector分配給ServerSocketChannel物件的唯一key,這個key.isAcceptable()為true則執行accept(key)方法,使這個key對應的伺服器端Channel一直處於accept監聽狀態。
Client.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Client implements Runnable{
//多路複用器(管理所有的通道)
private Selector selector;
//建立寫緩衝區
private ByteBuffer bufferWrite = ByteBuffer.allocate(1024);
//建立讀緩衝區
private ByteBuffer bufferRead = ByteBuffer.allocate(1024);
//建立連線的地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
public Client(){
try {
//開啟多路複用器
this.selector = Selector.open();
//開啟客戶端通道
SocketChannel sc = SocketChannel.open();
//客戶端通道為非阻塞模式
sc.configureBlocking(false);
//多路複用器Selector上,給sc註冊connect事件狀態
sc.register(selector, SelectionKey.OP_CONNECT);
//進行連線
sc.connect(address);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
SocketChannel socketChannel;
while(true){
try {
//要讓多路複用器開始監聽
this.selector.select();
//返回多路複用器已經選擇的結果集
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
//進行遍歷
while(keys.hasNext()){
//獲取一個選擇的元素
SelectionKey key = keys.next();
//直接從容器中移除就可以了
keys.remove();
//如果是有效的
if(key.isValid()){
//如果為連線狀態
if(key.isConnectable()){
System.out.println("client connect");
socketChannel =(SocketChannel)key.channel();
/*Returns:
true if, and only if, a connection operation has been initiated on
this channel but not yet completed by invoking the finishConnect method*/
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
System.out.println("客戶端完成連線操作!");
//把資料放到緩衝區中
bufferWrite.put("Hello,Server".getBytes());
//對緩衝區進行復位
bufferWrite.flip();
//寫出資料給Server端
socketChannel.write(bufferWrite);
//清空寫緩衝區
bufferWrite.clear();
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 如果為可讀狀態
if(key.isReadable()){
this.read(key);
}
// 如果為可寫狀態
if(key.isWritable()){
this.write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key){
try {
SocketChannel sc = (SocketChannel) key.channel();
byte[] bytes = new byte[1024];
System.in.read(bytes);
//把資料放到緩衝區中
bufferWrite.put(bytes);
//對緩衝區進行復位
bufferWrite.flip();
//寫出資料給Server端
sc.write(bufferWrite);
//清空緩衝區資料
bufferWrite.clear();
sc.register(this.selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//清空緩衝區舊的資料
this.bufferRead.clear();
//獲取之前註冊的socket通道物件
SocketChannel sc = (SocketChannel) key.channel();
//讀取資料到緩衝區
int count = sc.read(this.bufferRead);
//如果沒有資料
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//有資料則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)
this.bufferRead.flip();
//根據緩衝區的資料長度建立相應大小的byte陣列,接收緩衝區的資料
byte[] bytes = new byte[this.bufferRead.remaining()];//this.readBuf.remaining()可用資料個數
//接收緩衝區資料到位元組陣列
this.bufferRead.get(bytes);
// 列印結果
String body = new String(bytes).trim();
System.out.println("客戶端接收到伺服器端返回的資訊 : " + body);
sc.register(this.selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
Client client=new Client();
new Thread(client).start();//單獨啟動一個執行緒,去輪詢註冊到多路複用器上的所有通道
} catch (Exception e) {
e.printStackTrace();
}
}
}
run as --java application,首先啟動Server,Eclipse的console輸出如下:
再啟動Client,此時Server對應的console如下:
Client對應的console如下:
Client與Server進行互動通訊,互相傳送聊天訊息後,Eclipse的console輸入如下:
AIO
傳統的BIO建立連線需要三次握手,並且在伺服器端建立一個執行緒去處理客戶端請求。在NIO中,客戶端Channel通道註冊到多路複用器Selector上,減少了三次握手的過程,在伺服器端只需要一個執行緒去輪詢註冊到多路複用器上的Channel的狀態位,根據不同狀態位執行不同的操作。
JDK1.7之後,AIO在之前NIO的基礎上引入非同步通道的概念,並提供了非同步檔案和非同步套接字通道的實現,實現了非同步非阻塞。AIO不需要通過多路複用器來對註冊的通道進行輪詢操作,即可實現非同步讀寫,簡化了NIO程式設計模型。相對於NIO中使用的SocketChannel、ServerSocketChannel,AIO中使用的是AsynchronousSocketChannel、AsynchronousServerSocketChannel。
Server.java
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.Channel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server {
//執行緒池
private ExecutorService executorService;
//通道group
private AsynchronousChannelGroup channelGroup;
//非同步伺服器通道
public AsynchronousServerSocketChannel assc;
public Server(int port){
try {
//建立一個快取池,注意不要使用FixedThreadPool,否則只能接受指定數量的併發客戶端請求
executorService = Executors.newCachedThreadPool();
//建立非同步channel group,1代表初始化執行緒的數量
/*Creates an asynchronous channel group with a given thread
pool that creates new threads as needed.*/
channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//建立非同步伺服器通道
/*Opens an asynchronous server-socket channel.*/
assc = AsynchronousServerSocketChannel.open(channelGroup);
//進行繫結監聽埠
assc.bind(new InetSocketAddress(port));
System.out.println("server start , port : " + port);
//此處不是阻塞,而是繼續向下執行。進行通訊的相關處理操作在ServerCompletionHandler
assc.accept(this, new ServerCompletionHandler());//this指的是Server類的物件
//一直休眠 不讓伺服器執行緒停止
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Server server = new Server(8765);
}
}
在Server端建立一個快取執行緒池,伺服器端使用的是AsynchronousServerSocketChannel,使用bind()方法繫結監聽埠,使用如上述程式碼Server.java中accept(this, new ServerCompletionHandler())接收和處理客戶端請求,但是這個accept是一個非同步操作,交給執行緒池去非同步的處理當前這個客戶端操作,而Server.java對應的主執行緒繼續向下執行,所以在程式碼中使用了Thread.sleep(Integer.MAX_VALUE);保持Server對應的執行緒非關閉。
ServerCompletionHandler.java
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
@Override
public void completed(AsynchronousSocketChannel asc, Server attachment) {
//當有一個客戶端接入的時候 直接呼叫Server的accept方法
attachment.assc.accept(attachment, this);//this指的是ServerCompletionHandler物件
read(asc);
}
//AsynchronousSocketChannel為客戶端通道
private void read(final AsynchronousSocketChannel asc) {
//讀取資料
ByteBuffer buf = ByteBuffer.allocate(1024);
//非同步方法,不會阻塞在這,主程式繼續執行下面操作
/*This method initiates an asynchronous read operation to read a sequence of bytes
from this channel into the given buffer. */
asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//進行讀取之後,重置標識位
attachment.flip();
//獲得讀取的位元組數
System.out.println("Server接收內容位元組數:" + resultSize);
//獲取讀取的資料
String resultData = new String(attachment.array()).trim();
System.out.println("Server接收到的內容:" + resultData);
String response = "收到資料" + resultData;
write(asc, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private void write(AsynchronousSocketChannel asc, String response) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(response.getBytes());
buf.flip();
/*This method initiates an asynchronous write operation to write a sequence of bytes
to this channel from the given buffer. */
//使用到多執行緒設計模式中的Future,先返回一個Future代理物件。後臺新啟動一個執行緒A,進行資料的寫操作。呼叫get()方法時才真正獲取執行緒A執行任務的結果
asc.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
}
Client.java
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
public class Client implements Runnable{
private AsynchronousSocketChannel asc ;
public Client() throws Exception {
asc = AsynchronousSocketChannel.open();
}
public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1", 8765));
}
public void write(String content){
try {
asc.write(ByteBuffer.wrap(content.getBytes())).get();//呼叫get()方法非同步寫
read();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() {
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
asc.read(buf).get();
buf.flip();
byte[] respByte = new byte[buf.remaining()];
buf.get(respByte);
System.out.println("客戶端接收到的反饋資訊:"+new String(respByte,"utf-8").trim());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
}
}
public static void main(String[] args) throws Exception {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();
new Thread(c1, "c1").start();
new Thread(c2, "c2").start();
new Thread(c3, "c3").start();
Thread.sleep(1000);
c1.write("this is c1");
c2.write("this is c2");
c3.write("this is c3");
}
}
啟動Server,在Eclipse的console輸出如下:
啟動Client,此時伺服器端和客戶端對應的console輸出如下:
阻塞與非阻塞、同步與非同步
阻塞:應用程式在獲取網路資料的時候,如果網路傳輸資料很慢,那麼程式就一直等待,直到資料傳輸完畢為止。
非阻塞:應用程式直接可以獲取已經準備就緒好的資料,無須等待。
IO(BIO)為同步阻塞形式,NIO為同步非阻塞形式。在JDK1.7之後,升級了NIO庫包,支援非同步非阻塞通訊模型NIO2.0(AIO)
同步與非同步一般是面向作業系統與應用程式對IO操作的層面上來區別的
同步:應用程式會直接參與IO讀寫操作,並直接阻塞到某一個方法上,直到資料準備就緒;或者採用輪詢的策略實時檢查資料的就緒狀態,如果就緒則獲取資料。
非同步:所有的IO讀寫操作交給作業系統處理,與應用程式沒有直接關係。當作業系統完成了IO讀寫操作時,會給應用程式發通知,應用程式直接拿走資料即可。
BIO、NIO、AIO三者區別
BIO:它屬於傳統的Socket程式設計,客戶端與伺服器端連線的建立需要經過TCP3次握手的過程。伺服器端ServerSocket首先啟動,指定埠並執行accept()進行阻塞,監聽客戶端的連線請求。若接收到客戶端的連線請求併成功建立連線後,客戶端與伺服器端通過Socket套接字中的資料流進行相互之間的資料通訊。針對每一個成功建立連線的客戶端,伺服器端都會建立一個執行緒去處理這個客戶端的請求,若建立連線的客戶端規模很大的時候,對伺服器端資源是一種嚴重浪費。
NIO:在NIO中引入了Channel通道、Buffer緩衝區、Selector多路複用器的概念,客戶端SocketChannel與伺服器端ServerSocketChannel都需要在Selector多路複用器上進行註冊。在伺服器端會建立一個執行緒對註冊到Selector多路複用器上的所有Channel進行輪詢,輪詢出處於就緒狀態的Channel集合,根據為每個Channel分配的唯一key,獲取具體的某個Channel,並根據其狀態標誌位,進行處理,從Channel中讀取或者寫入資料,寫到Buffer資料緩衝區中。每個管道都會對Selector進行註冊不同的事件狀態,方便Selector查詢,事件狀態包括:SelectionKey.OP_CONNECT連線狀態、SelectionKey.OP_ACCEPT阻塞狀態、SelectionKey.OP_READ可讀狀態、SelectionKey.OP_WRITE可寫狀態。
AIO:使用執行緒池中的執行緒來處理客戶端的請求,針對每一個客戶端的請求,會建立一個處理該任務的物件,如上面ServerCompletionHandler類的物件,來完成讀、寫任務。AIO真正實現了非同步非阻塞。