1. 程式人生 > >網路程式設計之BIO、NIO、AIO

網路程式設計之BIO、NIO、AIO

TCP直連Socket與ServerSocket通訊

Server.java

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

	final static int PROT = 8765;
	
	public static void main(String[] args) {
		
		ServerSocket server = null;
		try {
			server = new ServerSocket(PROT);
			System.out.println(" server start .. ");
			//進行阻塞
			Socket socket = server.accept();
			//新建一個執行緒執行客戶端的任務
			new Thread(new ServerHandler(socket)).start();
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(server != null){
				try {
					server.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			server = null;
		}
	}
}

ServerHandler.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class ServerHandler implements Runnable{

	private Socket socket ;
	
	public ServerHandler(Socket socket){
		this.socket = socket;
	}
	
	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
			/*Parameters:
				out An output stream
				autoFlush A boolean; if true, the println, printf, or format methods will flush the output buffer
            */
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String body = null;
			while(true){
				body = in.readLine();
				if(body == null) break;
				System.out.println("Server :" + body);
				out.println("伺服器端返回給客戶端的響應資料.");
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			if(socket != null){
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			socket = null;
		}
	}
}

啟動Server


Client.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {

	final static String ADDRESS = "127.0.0.1";
	final static int PORT = 8765;
	
	public static void main(String[] args) {
		
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		
		try {
			socket = new Socket(ADDRESS, PORT);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);
			
			//向伺服器端傳送資料
			out.println("客戶端傳送的的請求測試資料");
			String response = in.readLine();
			System.out.println("Client: " + response);
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			if(socket != null){
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			socket = null;
		}
	}
}

Eclispe的client、server輸出如下:


 

每次建立連線都要新啟動一個執行緒,而執行緒會佔用一定的資源。如果Client與Server建立的連線很多,就會建立很多的執行緒,ServerSocket所在的機器可能會出現資源逐步趨於耗盡的問題。

TCP建立連線三次握手:

第一次握手:建立連線時,客戶端傳送syn包(syn=j)到伺服器,並進入SYN_SENT狀態,等待伺服器確認;SYN:同步序列編號(Synchronize Sequence Numbers)。
第二次握手:伺服器收到syn包,必須確認客戶的SYN(ack=j+1),同時自己也傳送一個SYN包(syn=k),即SYN+ACK包,此時伺服器進入SYN_RECV狀態;
第三次握手:客戶端收到伺服器的SYN+ACK包,向伺服器傳送確認包ACK(ack=k+1),此包傳送完畢,客戶端和伺服器進入ESTABLISHED(TCP連線成功)狀態,完成三次握手。
三次握手完成後,客戶端與伺服器開始傳送資料。

網路程式設計的基本模型是Client/Server模型,即Client程序與Server程序直接進行相互通訊。伺服器端繫結某個埠並進行監聽,而客戶端通過指定IP、埠號向指定的Server發出連線請求,通過三次握手建立連線,若連線成功則客戶端與伺服器端即可進行相互通訊。

BIO同步阻塞

在JDK1.5之前,採用偽非同步的方式避免Server Socket建立過多的執行緒來處理客戶端的請求,其內部維護著一個執行緒池,將客戶端請求建立的Socket封裝成一個任務Task物件(任務Task類實現Runnable介面),把任務物件交給執行緒池處理,並配置相應的阻塞佇列BlockingQueue用於緩衝任務物件。線上程池中可以設定,用於處理Client建立連線Socket的執行緒池最大執行緒數,這樣就避免了Server Socket端無限制的建立子執行緒去處理每一個Client建立的連線而導致系統資源耗盡,機器宕機的問題。

Client.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client {
	
	final static String ADDRESS = "127.0.0.1";
	final static int PORT =8765;
	
	public static void main(String[] args) {
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			socket = new Socket(ADDRESS, PORT);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);
			
			out.println("Client request");
			
			String response = in.readLine();
			System.out.println("Client:" + response);
		}  catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (Exception e1) {
					e1.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e2) {
					e2.printStackTrace();
				}
			}
			if(socket != null){
				try {
					socket.close();
				} catch (Exception e3) {
					e3.printStackTrace();
				}
			}
			socket = null;				
		}
	}
}

Server.java

在Server Socket端使用自定義執行緒池HandlerExecutorPool,引數50是執行緒池的最大執行緒數,100為ArrayBlockingQueue排隊等待的緩衝佇列長度。針對監聽並建立連線的Socket,經過自定義的ServerHandler包裝後,交給自定義執行緒池進行處理,Server Socket繼續處於accept狀態,監聽來自Client的連線請求。

import java.io.BufferedReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

	final static int PORT = 8765;

	public static void main(String[] args) {
		ServerSocket server = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			server = new ServerSocket(PORT);
			System.out.println("server start");
			Socket socket = null;
			HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
			while(true){
				socket = server.accept();
				executorPool.execute(new ServerHandler(socket));
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (Exception e1) {
					e1.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e2) {
					e2.printStackTrace();
				}
			}
			if(server != null){
				try {
					server.close();
				} catch (Exception e3) {
					e3.printStackTrace();
				}
			}
			server = null;				
		}
	}
	
}

HandlerExecutorPool.java

由於在Server Socket中傳遞的引數maxPoolSize=50, queueSize=100。建立的ThreadPoolExecutor物件初始化執行緒池時就建立的執行緒數為Runtime.getRuntime().availableProcessors()即JVM可用的處理器數,執行緒池的最大執行緒數為50,空閒時間為120秒,即執行緒池中的某個執行緒若空閒時間超過120秒仍未有新的任務分配給這個執行緒,則這個執行緒會停止,其佔用的資源會被回收。ArrayBlockingQueue是一個基於陣列的阻塞佇列,是一個有界佇列,其內部維護著一個定長陣列,以便緩衝佇列中資料物件,佇列的讀寫未實現分離,因此資料的生產和消費不能完全並行。由於queueSize=100,則該有界佇列的長度為100。

在下面程式碼中,使用的是ArrayBlockingQueue有界佇列,當有新的Socket交給執行緒池處理時,若執行緒池的實際執行緒數小於Runtime.getRuntime().availableProcessors()時,則優先建立執行緒;若當前執行緒數大於Runtime.getRuntime().availableProcessors()則將任務加入到ArrayBlockingQueue佇列中。在佇列已滿情況下,若線上程池的匯流排程數不大於50的前提下,建立新的執行緒處理當前這個新任務;若執行緒池的執行緒數已達到50個,則對新任務執行拒絕策略。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class HandlerExecutorPool {

	private ExecutorService executor;
	public HandlerExecutorPool(int maxPoolSize, int queueSize){
		this.executor = new ThreadPoolExecutor(
				Runtime.getRuntime().availableProcessors(),
				maxPoolSize, 
				120L, 
				TimeUnit.SECONDS,
				new ArrayBlockingQueue<Runnable>(queueSize));
	}
	
	public void execute(Runnable task){
		this.executor.execute(task);
	}
}

ServerHandler.java

在上述Server.java中存在程式碼executorPool.execute(new ServerHandler(socket));,將經過ServerHandler包裝的Socket交給執行緒池中執行緒處理。ServerHandler實現了Runnable介面,在run()方法中獲取Client端傳遞給來的資料流,經過處理轉換後輸出,並使用out.println()方法給Client回傳Server Socket端的響應資訊。

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class ServerHandler implements Runnable {

	private Socket socket;
	public ServerHandler (Socket socket){
		this.socket = socket;
	}
	
	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String body = null;
			while(true){
				body = in.readLine();
				if(body == null) break;
				System.out.println("Server:" + body);
				out.println("Server response");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (Exception e1) {
					e1.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e2) {
					e2.printStackTrace();
				}
			}
			if(socket != null){
				try {
					socket.close();
				} catch (Exception e3) {
					e3.printStackTrace();
				}
			}
			socket = null;			
		}
	}
}

先啟動Server.java


再啟動Client.java,此時Server對應的console內容如下


Client對應的console內容如下


NIO同步非阻塞

NIO是非阻塞IO,在傳統TCP點對點直接連線的基礎上做了一層封裝,並不是Client與Server直接建立連線,而是Client先到Server端進行管道註冊。在Server端建立一個Selector多路複用器,啟動一個執行緒輪詢註冊到Selector上所有Channel的狀態,根據通道的狀態,執行相關操作。通道的狀態包括:Connect連線狀態、Accept阻塞狀態、Read可讀狀態、Write可寫狀態。NIO程式設計中有3個重要部分:Buffer緩衝區、Channel管道、Selector多路複用器

Buffer緩衝區

在NIO類庫中加入了Buffer物件,它包含一些需要寫入或讀取的資料。在面向流的IO中,可以將資料直接寫入或讀取到Stream物件中。在NIO庫中,所有資料的讀取與寫入都是用緩衝區處理的。緩衝區實際上是一個數組,這個陣列為緩衝區提供了資料的訪問讀寫等操作屬性,如位置、容量、上限等。通常為一個位元組陣列(ByteBuffer),也可以是其它java基本型別(Boolean除外)的陣列,如:ByteBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer

NIO程式設計中,在get()、put(value)、put(array)之後,注意執行Buffer物件的flip()方法,將position復位為0

import java.nio.IntBuffer;

public class TestBuffer {
	public static void main(String[] args) {	
		//建立指定長度的緩衝區
		IntBuffer intBuffer = IntBuffer.allocate(10);
		intBuffer.put(1);//pos=0值為1
		intBuffer.put(2);//pos=1值為2
		intBuffer.put(3);//pos=2值為3
		intBuffer.put(4);//pos=3值為4
		intBuffer.put(5);//pos=4值為5
		System.out.println("intBuffer:" + intBuffer);
		//位置pos復位為0
		intBuffer.flip();
		System.out.println("intBuffer執行flip()後:" + intBuffer);
		System.out.println("pos:" + intBuffer.position());
		System.out.println("lim:" + intBuffer.limit());//intBuffer中已放置元素的個數
		System.out.println("cap:" + intBuffer.capacity());//intBuffer容量
		
		//get(index)方法,pos不改變
		System.out.println("intBuffer.get(3):" + intBuffer.get(3));
		System.out.println("intBuffer執行get(3)後:" + intBuffer);
		
		//put(index, change)方法,pos不改變
		intBuffer.put(2, 33);
		System.out.println("intBuffer執行put(2, 33)後:" + intBuffer);;
		
		//get()方法,pos改變,pos值加1
		for (int i = 0; i < intBuffer.limit(); i++) {
			System.out.print(intBuffer.get() + "\t");
		}
		System.out.println();
		System.out.println("intBuffer使用for迴圈遍歷之後: " + intBuffer);
		System.out.println("pos:" + intBuffer.position());
		System.out.println("lim:" + intBuffer.limit());//intBuffer中已放置元素的個數
		System.out.println("cap:" + intBuffer.capacity());//intBuffer容量
		  
		//wrap包裹陣列
		System.out.println("------wrap包裹陣列------");
		int[] array = new int[]{6,7,8,9,10};
		IntBuffer wrapIntBuffer = IntBuffer.wrap(array);
		System.out.println("wrapIntBuffer:"+wrapIntBuffer);
		
		for (int i = 0; i < wrapIntBuffer.limit(); i++) {
			System.out.print(wrapIntBuffer.get() + "\t");
		}
		System.out.println();
		System.out.println("wrapIntBuffer使用for迴圈遍歷之後: " + wrapIntBuffer);
		
		//pos復位為0
		wrapIntBuffer.flip();
		
		//修改wrapIntBuffer下標2位置處的8為88
		wrapIntBuffer.put(2,88);
		System.out.println("pos:" + wrapIntBuffer.position());
		System.out.println("lim:" + wrapIntBuffer.limit());//intBuffer中已放置元素的個數
		System.out.println("cap:" + wrapIntBuffer.capacity());//intBuffer容量
		System.out.print("wrapIntBuffer使用for迴圈遍歷:");
		for (int i = 0; i < wrapIntBuffer.limit(); i++) {
			System.out.print(wrapIntBuffer.get() + "\t");
		}
		System.out.println();
		System.out.print("被wrap包裹的array內容發生了改變:");
		for(int j=0;j<array.length;j++){
			System.out.print(array[j]+"\t");
		}
		
		
		//複製方法
		System.out.println();
		System.out.println("------複製方法------");
		
		IntBuffer intBufferOne = IntBuffer.allocate(10);
		intBufferOne.put(array);//pos發生變化
		System.out.println("intBufferOne:"+intBufferOne);
		intBufferOne.flip();//pos復位
		System.out.print("intBufferOne使用for迴圈遍歷:");
		for (int i = 0; i < intBufferOne.limit(); i++) {
			System.out.print(intBufferOne.get() + "\t");
		}
		
		//duplicate()複製
		intBufferOne.flip();//pos復位
		IntBuffer intBufferTwo = intBufferOne.duplicate();
		System.out.println();
		System.out.println("intBufferTwo:"+intBufferTwo);
		System.out.println("可讀資料為:" + intBufferTwo.remaining());//limit - position
		intBufferTwo.position(2);
		System.out.println("intBufferTwo:"+intBufferTwo);
		System.out.println("可讀資料為:" + intBufferTwo.remaining());//limit - position
		
	}
}

Eclipse的console輸出如下:

Channel通道

網路資料通過Channel通道讀取和寫入,通道與流不同之處在於通道是雙向的,而流(InputStream或OutputStream的子類)只能在一個方向上移動。通道可以用於讀、寫或者兩者同時進行。Channel通道可以與多路複用器結合起來,有多種狀態位,方便多路複用器識別並執行相應操作。

Channel通道分為兩大類:一類是網路讀寫的SelectableChannel,一類是用於檔案操作的FileChannel。SocketChannel和ServerSocketChannel都是SelectableChannel的子類。

Selector多路複用器

它是NIO程式設計的基礎,提供選擇已經就緒任務的能力。當IO事件(管道)註冊到選擇器以後,Selector會分配給每個管道一個key值。Selector會不斷輪詢註冊在其上的通道Channel,如果某個通道發生了讀寫操作,這個通道就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以取得就緒的Channel集合,從而進行後續的IO操作,從管道中讀取或者寫入資料,寫到資料緩衝區Buffer中。一個多路複用器Selector可以負責成千上萬的Channel通道,JDK使用epoll代替了傳統的select實現,使得獲取連線控制代碼沒有限制。只需要一個執行緒負責Selector輪詢,就可以接入成千上萬的客戶端。

下面程式碼,在Server類的構造方法中,建立ServerSocketChannel物件,將該物件註冊到多路複用器Selector上,並處於阻塞accept狀態。由於Server類實現了Runnable介面,在run()方法中存在while(true)迴圈,在while迴圈體中不論客戶端Channel還是伺服器Channel,都在多路複用器的輪詢的範圍。在輪詢過程中,獲取所有註冊到多路複用器Selector上的key,在這個while(true)首次執行的時候,獲取到的處於阻塞狀態的Channel為伺服器Channel,這個伺服器端Channel執行accept()方法,監聽處於就緒狀態的客戶端Channel,將客戶端Channel通道註冊到多路複用器Selector上,並監聽其讀標示位。在存在客戶端Channel註冊到Selector的情況下,在while(true)迴圈體中,若客戶端key處於key.isReadable()為true時,就會執行read()方法。在read方法中,首先將緩衝區清空,獲取呼叫read()方法的客戶端Channel,讀取客戶端Channel中的資料到緩衝區Buffer。

綜合使用Buffer、Channel、Selector的Client端與Server端雙向通訊示例

Server.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Server implements Runnable{
	
	//多路複用器(管理所有的通道)
	private Selector selector;
	//建立讀緩衝區,快取空間大小1024
	private ByteBuffer readBuf = ByteBuffer.allocate(1024);
	//建立寫緩衝區,快取空間大小1024 
	private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
	
	public Server(int port){
		try {
			//開啟多路複用器
			this.selector = Selector.open();
			//開啟伺服器通道
			ServerSocketChannel ssc = ServerSocketChannel.open();
			//設定伺服器通道為非阻塞模式
			ssc.configureBlocking(false);
			//繫結監聽埠
			ssc.bind(new InetSocketAddress(port));
			//把伺服器通道註冊到多路複用器上,並且監聽阻塞事件
			ssc.register(this.selector, SelectionKey.OP_ACCEPT);
			
			System.out.println("Server start, port :" + port);
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		while(true){
			try {
				//要讓多路複用器開始監聽
				this.selector.select();
				//返回多路複用器已經選擇的結果集
				Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
				//進行遍歷
				while(keys.hasNext()){
					//獲取一個選擇的元素
					SelectionKey key = keys.next();
					//直接從容器中移除就可以了
					keys.remove();
					//如果是有效的
					if(key.isValid()){
						//如果為阻塞狀態
						if(key.isAcceptable()){
							this.accept(key);
						}
						//如果為可讀狀態
						if(key.isReadable()){
							this.read(key);
						}
						//如果為可 寫狀態
						if(key.isWritable()){
							this.write(key); 
						}
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/*向SocketChannel中寫資料*/
	private void write(SelectionKey key){
		SocketChannel sc =  (SocketChannel) key.channel();
		try {
			//定義一個位元組陣列
			byte[] bytes = new byte[1024];
			//使用系統錄入功能,等待使用者輸入資料並回車
			System.in.read(bytes);
			//把資料放到緩衝區中
			writeBuf.put(bytes);
			//對緩衝區進行復位
			writeBuf.flip();
			//寫出資料給Client端
			sc.write(writeBuf);
			//清空緩衝區資料
			writeBuf.clear();
			//因已經執行了向SocketChannel的寫操作,這裡向selector註冊sc通道的讀事件狀態
			sc.register(this.selector, SelectionKey.OP_READ);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void read(SelectionKey key) {
		try {
			//清空緩衝區舊的資料
			this.readBuf.clear();
			//獲取之前註冊的socket通道物件
			SocketChannel sc = (SocketChannel) key.channel();
			//讀取資料到緩衝區
			int count = sc.read(this.readBuf);
			//如果沒有資料
			if(count == -1){
				key.channel().close();
				key.cancel();
				return;
			}
			//有資料則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)
			/*Flips this buffer. The limit is set to the current position and then 
			the position is set to zero. If the mark is defined then it is discarded.*/ 
			this.readBuf.flip();
			//根據緩衝區的資料長度建立相應大小的byte陣列,接收緩衝區的資料
			byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用資料個數
			//接收緩衝區資料到位元組陣列
			this.readBuf.get(bytes);
			//列印結果
			String body = new String(bytes).trim();
			System.out.println("伺服器端接收到客戶端傳送的資訊 : " + body);
			//因已經執行了向SocketChannel的讀操作,這裡向selector註冊sc通道的寫事件狀態
			sc.register(this.selector,SelectionKey.OP_WRITE); 
			
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}

	private void accept(SelectionKey key) {
		try {
			//服務通道
			ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
			//阻塞方法
			SocketChannel sc = ssc.accept();
			//阻塞模式
			sc.configureBlocking(false);
			//將客戶端通道註冊到多路複用器上,並設定讀取標識
			sc.register(this.selector, SelectionKey.OP_READ);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		
		new Thread(new Server(8765)).start();;
	}
}

在Server.java中,因為ServerSocketChannel物件在Selector上僅僅註冊了SelectionKey.OP_ACCEPT事件狀態,因此Server端建立的一個執行緒,在輪詢Selector過程中,獲取處於就緒狀態的所有Channel通道的集合。Selector分配給ServerSocketChannel物件的唯一key,這個key.isAcceptable()為true則執行accept(key)方法,使這個key對應的伺服器端Channel一直處於accept監聽狀態。

Client.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Client implements Runnable{
	
	//多路複用器(管理所有的通道)
	private Selector selector;
	//建立寫緩衝區
	private ByteBuffer bufferWrite = ByteBuffer.allocate(1024);
	//建立讀緩衝區
	private ByteBuffer bufferRead = ByteBuffer.allocate(1024);
	//建立連線的地址
	InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
	
	public Client(){
		try {
			//開啟多路複用器
			this.selector = Selector.open();
			//開啟客戶端通道
			SocketChannel sc = SocketChannel.open();
			//客戶端通道為非阻塞模式
			sc.configureBlocking(false);
			//多路複用器Selector上,給sc註冊connect事件狀態
			sc.register(selector, SelectionKey.OP_CONNECT);
			//進行連線
			sc.connect(address);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		SocketChannel socketChannel;
		while(true){
			try {
				//要讓多路複用器開始監聽
				this.selector.select();
				//返回多路複用器已經選擇的結果集
				Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
				//進行遍歷
				while(keys.hasNext()){
					//獲取一個選擇的元素
					SelectionKey key = keys.next();
					//直接從容器中移除就可以了
					keys.remove();
					//如果是有效的
					if(key.isValid()){
						//如果為連線狀態
						if(key.isConnectable()){
							System.out.println("client connect");
							socketChannel =(SocketChannel)key.channel();
							/*Returns:
								true if, and only if, a connection operation has been initiated on 
								this channel but not yet completed by invoking the finishConnect method*/
							if(socketChannel.isConnectionPending()){
								socketChannel.finishConnect();
								System.out.println("客戶端完成連線操作!");
								//把資料放到緩衝區中
								bufferWrite.put("Hello,Server".getBytes());
								//對緩衝區進行復位
								bufferWrite.flip();
								//寫出資料給Server端
								socketChannel.write(bufferWrite);
								//清空寫緩衝區
								bufferWrite.clear();
								
							}
							socketChannel.register(selector, SelectionKey.OP_READ);
						}
						// 如果為可讀狀態
						if(key.isReadable()){
							this.read(key);
						}
						// 如果為可寫狀態
						if(key.isWritable()){
							this.write(key); 
						}
					}	
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	private void write(SelectionKey key){
		try {
			SocketChannel sc =  (SocketChannel) key.channel();
			byte[] bytes = new byte[1024];
			System.in.read(bytes);
			//把資料放到緩衝區中
			bufferWrite.put(bytes);
			//對緩衝區進行復位
			bufferWrite.flip();
			//寫出資料給Server端
			sc.write(bufferWrite);
			//清空緩衝區資料
			bufferWrite.clear();
			sc.register(this.selector, SelectionKey.OP_READ);
		} catch (ClosedChannelException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void read(SelectionKey key) {
		try {
			//清空緩衝區舊的資料
			this.bufferRead.clear();
			//獲取之前註冊的socket通道物件
			SocketChannel sc = (SocketChannel) key.channel();
			//讀取資料到緩衝區
			int count = sc.read(this.bufferRead);
			//如果沒有資料
			if(count == -1){
				key.channel().close();
				key.cancel();
				return;
			}
			//有資料則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)
			this.bufferRead.flip();
			//根據緩衝區的資料長度建立相應大小的byte陣列,接收緩衝區的資料
			byte[] bytes = new byte[this.bufferRead.remaining()];//this.readBuf.remaining()可用資料個數
			//接收緩衝區資料到位元組陣列
			this.bufferRead.get(bytes);
			// 列印結果
			String body = new String(bytes).trim();
			System.out.println("客戶端接收到伺服器端返回的資訊 : " + body);
			sc.register(this.selector, SelectionKey.OP_WRITE); 
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		try {
			Client client=new Client();
			new Thread(client).start();//單獨啟動一個執行緒,去輪詢註冊到多路複用器上的所有通道
		} catch (Exception e) {
			e.printStackTrace();
		} 	
	}
}

run as --java application,首先啟動Server,Eclipse的console輸出如下:


再啟動Client,此時Server對應的console如下:


Client對應的console如下:


Client與Server進行互動通訊,互相傳送聊天訊息後,Eclipse的console輸入如下:



AIO

傳統的BIO建立連線需要三次握手,並且在伺服器端建立一個執行緒去處理客戶端請求。在NIO中,客戶端Channel通道註冊到多路複用器Selector上,減少了三次握手的過程,在伺服器端只需要一個執行緒去輪詢註冊到多路複用器上的Channel的狀態位,根據不同狀態位執行不同的操作。

JDK1.7之後,AIO在之前NIO的基礎上引入非同步通道的概念,並提供了非同步檔案和非同步套接字通道的實現,實現了非同步非阻塞。AIO不需要通過多路複用器來對註冊的通道進行輪詢操作,即可實現非同步讀寫,簡化了NIO程式設計模型。相對於NIO中使用的SocketChannel、ServerSocketChannel,AIO中使用的是AsynchronousSocketChannel、AsynchronousServerSocketChannel。

Server.java

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.Channel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
	//執行緒池
	private ExecutorService executorService;
	//通道group
	private AsynchronousChannelGroup channelGroup;
	//非同步伺服器通道
	public AsynchronousServerSocketChannel assc;
	
	public Server(int port){
		try {
			//建立一個快取池,注意不要使用FixedThreadPool,否則只能接受指定數量的併發客戶端請求
			executorService = Executors.newCachedThreadPool();
			//建立非同步channel group,1代表初始化執行緒的數量
			/*Creates an asynchronous channel group with a given thread 
			pool that creates new threads as needed.*/
			channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
			//建立非同步伺服器通道
			/*Opens an asynchronous server-socket channel.*/ 
			assc = AsynchronousServerSocketChannel.open(channelGroup);
			//進行繫結監聽埠
			assc.bind(new InetSocketAddress(port));
			
			System.out.println("server start , port : " + port);
			//此處不是阻塞,而是繼續向下執行。進行通訊的相關處理操作在ServerCompletionHandler
			assc.accept(this, new ServerCompletionHandler());//this指的是Server類的物件
			//一直休眠 不讓伺服器執行緒停止
			Thread.sleep(Integer.MAX_VALUE);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		Server server = new Server(8765);
	}
}

在Server端建立一個快取執行緒池,伺服器端使用的是AsynchronousServerSocketChannel,使用bind()方法繫結監聽埠,使用如上述程式碼Server.java中accept(this, new ServerCompletionHandler())接收和處理客戶端請求,但是這個accept是一個非同步操作,交給執行緒池去非同步的處理當前這個客戶端操作,而Server.java對應的主執行緒繼續向下執行,所以在程式碼中使用了Thread.sleep(Integer.MAX_VALUE);保持Server對應的執行緒非關閉。

ServerCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {

	@Override
	public void completed(AsynchronousSocketChannel asc, Server attachment) {
		//當有一個客戶端接入的時候 直接呼叫Server的accept方法
		attachment.assc.accept(attachment, this);//this指的是ServerCompletionHandler物件
		read(asc);
	}

	//AsynchronousSocketChannel為客戶端通道
	private void read(final AsynchronousSocketChannel asc) {
		//讀取資料
		ByteBuffer buf = ByteBuffer.allocate(1024);
		//非同步方法,不會阻塞在這,主程式繼續執行下面操作
		/*This method initiates an asynchronous read operation to read a sequence of bytes 
		from this channel into the given buffer. */
		asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
			@Override
			public void completed(Integer resultSize, ByteBuffer attachment) {
				//進行讀取之後,重置標識位
				attachment.flip();
				//獲得讀取的位元組數
				System.out.println("Server接收內容位元組數:" + resultSize);
				//獲取讀取的資料
				String resultData = new String(attachment.array()).trim();
				System.out.println("Server接收到的內容:" + resultData);
				String response = "收到資料" + resultData;
				write(asc, response);
			}
			@Override
			public void failed(Throwable exc, ByteBuffer attachment) {
				exc.printStackTrace();
			}
		});
	}
	
	private void write(AsynchronousSocketChannel asc, String response) {
		try {
			ByteBuffer buf = ByteBuffer.allocate(1024);
			buf.put(response.getBytes());
			buf.flip();
			/*This method initiates an asynchronous write operation to write a sequence of bytes 
			to this channel from the given buffer. */
			//使用到多執行緒設計模式中的Future,先返回一個Future代理物件。後臺新啟動一個執行緒A,進行資料的寫操作。呼叫get()方法時才真正獲取執行緒A執行任務的結果
			asc.write(buf).get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void failed(Throwable exc, Server attachment) {
		exc.printStackTrace();
	}

}

Client.java
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;

public class Client implements Runnable{

	private AsynchronousSocketChannel asc ;
	
	public Client() throws Exception {
		asc = AsynchronousSocketChannel.open();
	}
	
	public void connect(){
		asc.connect(new InetSocketAddress("127.0.0.1", 8765));
	}
	
	public void write(String content){
		try {
			asc.write(ByteBuffer.wrap(content.getBytes())).get();//呼叫get()方法非同步寫
			read();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void read() {
		ByteBuffer buf = ByteBuffer.allocate(1024);
		try {
			asc.read(buf).get();
			buf.flip();
			byte[] respByte = new byte[buf.remaining()];
			buf.get(respByte);
			System.out.println("客戶端接收到的反饋資訊:"+new String(respByte,"utf-8").trim());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		while(true){
			
		}
	}
	
	public static void main(String[] args) throws Exception {
		Client c1 = new Client();
		c1.connect();
		Client c2 = new Client();
		c2.connect();
		Client c3 = new Client();
		c3.connect();
		
		new Thread(c1, "c1").start();
		new Thread(c2, "c2").start();
		new Thread(c3, "c3").start();
		
		Thread.sleep(1000);
		
		c1.write("this is c1");
		c2.write("this is c2");
		c3.write("this is c3");
	}
	
}

啟動Server,在Eclipse的console輸出如下:

啟動Client,此時伺服器端和客戶端對應的console輸出如下:



阻塞與非阻塞、同步與非同步

阻塞:應用程式在獲取網路資料的時候,如果網路傳輸資料很慢,那麼程式就一直等待,直到資料傳輸完畢為止。

非阻塞:應用程式直接可以獲取已經準備就緒好的資料,無須等待。

IO(BIO)為同步阻塞形式,NIO為同步非阻塞形式。在JDK1.7之後,升級了NIO庫包,支援非同步非阻塞通訊模型NIO2.0(AIO)

同步與非同步一般是面向作業系統與應用程式對IO操作的層面上來區別的

同步:應用程式會直接參與IO讀寫操作,並直接阻塞到某一個方法上,直到資料準備就緒;或者採用輪詢的策略實時檢查資料的就緒狀態,如果就緒則獲取資料。

非同步:所有的IO讀寫操作交給作業系統處理,與應用程式沒有直接關係。當作業系統完成了IO讀寫操作時,會給應用程式發通知,應用程式直接拿走資料即可。

BIO、NIO、AIO三者區別

BIO:它屬於傳統的Socket程式設計,客戶端與伺服器端連線的建立需要經過TCP3次握手的過程。伺服器端ServerSocket首先啟動,指定埠並執行accept()進行阻塞,監聽客戶端的連線請求。若接收到客戶端的連線請求併成功建立連線後,客戶端與伺服器端通過Socket套接字中的資料流進行相互之間的資料通訊。針對每一個成功建立連線的客戶端,伺服器端都會建立一個執行緒去處理這個客戶端的請求,若建立連線的客戶端規模很大的時候,對伺服器端資源是一種嚴重浪費。

NIO:在NIO中引入了Channel通道、Buffer緩衝區、Selector多路複用器的概念,客戶端SocketChannel與伺服器端ServerSocketChannel都需要在Selector多路複用器上進行註冊。在伺服器端會建立一個執行緒對註冊到Selector多路複用器上的所有Channel進行輪詢,輪詢出處於就緒狀態的Channel集合,根據為每個Channel分配的唯一key,獲取具體的某個Channel,並根據其狀態標誌位,進行處理,從Channel中讀取或者寫入資料,寫到Buffer資料緩衝區中。每個管道都會對Selector進行註冊不同的事件狀態,方便Selector查詢,事件狀態包括:SelectionKey.OP_CONNECT連線狀態、SelectionKey.OP_ACCEPT阻塞狀態、SelectionKey.OP_READ可讀狀態、SelectionKey.OP_WRITE可寫狀態。

AIO:使用執行緒池中的執行緒來處理客戶端的請求,針對每一個客戶端的請求,會建立一個處理該任務的物件,如上面ServerCompletionHandler類的物件,來完成讀、寫任務。AIO真正實現了非同步非阻塞。