NIO結合Socket程式設計實現
基本概念
Socket又稱“套接字”,應用程式通過“套接字”向網路發出請求或者應答網路請求。Socket和SocketServer類庫位於java.net包中,ServerSocket用於伺服器端,Socket是建立網路連結使用的。在連線成功時,應用程式兩端會產生一個Socket例項,操作這個例項,完成所需的會話。對於一個網路連線來說,套接字是平等的,不因為在伺服器端或在客戶端而產生不同的級別。不管是Socket還是ServerSocket它們的工作都是通過SocketImpl類及其子類完成的。套接字之間的連線過程可以分為四個步驟:
- (1)伺服器監聽:是伺服器端套接字並不定位具體的客戶端套接字,而是出於平等連線的狀態,實時監控網路狀態。
- (2)客戶端請求:指由客戶端的套接字提出連線請求,要連線的目標是伺服器端的套接字。為此,客戶端的套接字必須首先描述它要連線的伺服器套接字,指出伺服器套接字的地址和埠號,然後向伺服器端套接字提出連線請求。
- (3)伺服器連線確認:當伺服器端套接字接收到客戶端的套接字的連線請求,它就響應客戶端套接字的請求,建立一個新的執行緒,把伺服器端套接字的描述發給客戶端。
- (4)客戶端連線確認:一旦客戶端確認了此描述,連線就建立好了,雙方開始進行通訊。而伺服器端套接字出於監聽狀態,繼續接受其他客戶端套接字的連線請求。
IO(BIO)和NIO的區別,其本質就是阻塞和非阻塞的區別。
- 阻塞:應用程式在獲取網路資料的時候,如果網路傳輸資料很慢,那麼程式就一直等待,直到傳輸完畢為止。
- 非阻塞:應用程式直接可以獲取已經準備就緒的資料,無須等待。
IO為同步阻塞形式,NIO為同步非阻塞形式。NIO並沒有實現非同步,在JDK1.7之後,升級了NIO庫包,支援非同步非阻塞通訊模型即NIO2.0(AIO)。同步和非同步一般是面向作業系統與應用程式對IO操作的層面來區別。
- 同步時:應用程式會直接參與IO讀寫操作,並且我們的應用程式會直接阻塞到某一個方法上,直到資料準備就緒;或者採用輪詢的策略實時檢查資料的就緒狀態,如果就緒則獲取資料。
- 非同步時:則所有的IO讀寫操作交給作業系統處理,與我們的應用程式沒有直接關係,我們程式不需要關係IO讀寫,當作業系統完成IO讀寫操作時,會給我們的應用程式傳送通知,我們的應用程式直接拿走資料即可。
同步說的是server服務端的執行方式,阻塞說的是具體的技術,接收資料的方式、狀態(io,nio)。
public class Server {
final static int PORT = 8888;
public static void main(String[] args) {
ServerSocket server = null;
try{
server = new ServerSocket(PORT);
System.out.println("server start...");
//進行阻塞
Socket socket = server.accept();
//新建一個執行緒執行客戶端的任務
new Thread(new ServerHandler(socket)).start();
} catch (Exception e){
e.printStackTrace();
} finally{
if(server != null){
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
}
}
}
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream());
String body = null;
while(true){
body = in.readLine();
if(null == body) break;
System.out.println("Server: " + body);
out.println(body);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
in.close();
out.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT = 8888;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
//向伺服器端傳送資料
out.println("接受到客戶端的請求資料...");
String response = in.readLine();
System.out.println("Client: " + response);
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
try {
in.close();
out.close();
socket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
傳統的BIO(Blocking IO)程式設計
網路程式設計的基本模型是Client/Sever模型,也就是兩個程序直接進行相互通訊,其中服務端提供配置資訊(繫結埠的ip地址和監聽埠),客戶端通過連線操作箱伺服器端監聽的地址發起連線請求,通過三次握手建立連線,如果連線成功,則雙方即可以進行通訊(網路套接字socket)。
偽非同步IO
採用執行緒池和任務佇列可以實現一種偽非同步IO通訊框架。在學習過連線池和佇列的使用,我們將客戶端的socket封裝成一個task任務(實現Runnable介面的類)然後投遞到執行緒池中去,配置相應的佇列進行實現。
/**
* 修改Server,增加HandlerExecutorPool處理資料
* */
public class Server {
final static int PORT = 8888;
public static void main(String[] args) {
ServerSocket server = null;
Socket socket = null;
try{
server = new ServerSocket(PORT);
System.out.println("server start...");
HandlerExecutorPool executorPool = new HandlerExecutorPool(50,1000);
while(true){
socket = server.accept();
executorPool.execute(new ServerHandler(socket));
}
} catch (Exception e){
e.printStackTrace();
} finally{
if(server != null){
try {
server.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
socket = null;
}
}
}
public class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize,int queueSize){
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
NIO程式設計
NIO(Non-Block IO),即非阻塞IO。它包含以下幾個概念:
- Buffer(緩衝區)
- Channel(管道、通道)
- Selector(選擇器、多路複用器)
Buffer
Buffer是一個物件,它包含一些要寫入或者讀取的資料。在NIO類庫中加入Buffer對夏寧,體現了新庫與原IO的一個重要區別。在面向流的IO中,可以將資料直接寫入或讀取到Stream物件中。在NIO庫中,所有的資料都是用緩衝區處理的。緩衝區實質上是一個數組,通常它是一個位元組陣列(ByteBuffer),也可以使用其他型別的陣列。這個陣列為緩衝區提供了資料的訪問讀寫等操作屬性,如位置、容量、上限等概念。我們常用的就是ByteBuffer,實際上每一種java基本型別都對應了一種緩衝區(除了Boolean型別)。
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
public class TestBuffer {
public static void main(String[] args) {
// 1 基本操作
//建立指定長度的緩衝區
IntBuffer buf = IntBuffer.allocate(10);
buf.put(13);// position位置:0 - > 1
buf.put(21);// position位置:1 - > 2
buf.put(35);// position位置:2 - > 3
//把位置復位為0,也就是position位置:3 - > 0
buf.flip();
System.out.println("使用flip復位:" + buf);
System.out.println("容量為: " + buf.capacity()); //容量一旦初始化後不允許改變(warp方法包裹陣列除外)
System.out.println("限制為: " + buf.limit()); //由於只裝載了三個元素,所以可讀取或者操作的元素為3 則limit=3
System.out.println("獲取下標為1的元素:" + buf.get(1));
System.out.println("get(index)方法,position位置不改變:" + buf);
buf.put(1, 4);
System.out.println("put(index, change)方法,position位置不變:" + buf);;
for (int i = 0; i < buf.limit(); i++) {
//呼叫get方法會使其緩衝區位置(position)向後遞增一位
System.out.print(buf.get() + "\t");
}
System.out.println("buf物件遍歷之後為: " + buf);
// 2 wrap方法使用
/**
// wrap方法會包裹一個數組: 一般這種用法不會先初始化快取物件的長度,因為沒有意義,最後還會被wrap所包裹的陣列覆蓋掉。
// 並且wrap方法修改緩衝區物件的時候,陣列本身也會跟著發生變化。
int[] arr = new int[]{1,2,5};
IntBuffer buf1 = IntBuffer.wrap(arr);
System.out.println(buf1);
IntBuffer buf2 = IntBuffer.wrap(arr, 0 , 2);
//這樣使用表示容量為陣列arr的長度,但是可操作的元素只有實際進入快取區的元素長度
System.out.println(buf2);
*/
// 3 其他方法
/**
IntBuffer buf1 = IntBuffer.allocate(10);
int[] arr = new int[]{1,2,5};
buf1.put(arr);
System.out.println(buf1);
//一種複製方法
IntBuffer buf3 = buf1.duplicate();
System.out.println(buf3);
//設定buf1的位置屬性
//buf1.position(0);
buf1.flip();
System.out.println(buf1);
System.out.println("可讀資料為:" + buf1.remaining());
int[] arr2 = new int[buf1.remaining()];
//將緩衝區資料放入arr2陣列中去
buf1.get(arr2);
for(int i : arr2){
System.out.print(Integer.toString(i) + ",");
}
*/
}
}
Channel
通道(Channel),他就像自來水管道一樣,網路資料通過Channel讀取和寫入,通道與流不同之處在於通道是雙向的,而流只是一個方向上流動(一個流必須是InputStream或者OutputStream的子類),而通道可以用於讀、寫或者二者同時進行,最關鍵的是可以與多路複用器結合起來,有多種狀態為,方便多路複用器去識別。事實上通道分為兩大類,一類是網路讀寫的(SelectableChannel),一類是用於檔案操作的(FileChannel),我們使用的SocketChannel和ServerSocketChannel都是SelectableChannel的子類。
Selector
多路複用器(Selector),它是NIO程式設計的基礎,非常重要。多路複用器提供選擇已經就緒的任務的能力。簡單說,就是Selector會不斷地輪詢註冊在其上的通道(Channel),如果某個通道發生了讀寫操作,這個通道就出於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以取得就緒的Channel集合,從而進行後續的IO操作。一個多路複用器(Selector)可以輔助成千上萬的Channel通道,沒有上限,這也是JDK使用了epoll代替了傳統的select實現,獲得連線控制代碼沒有限制。這也就意味著我們只要一個執行緒負責Selector的輪詢,就可以接入成千上萬個客戶端,這是JDK NIO庫的巨大進步。 Selector執行緒就類似一個管理者(Master),管理了成千上萬個管道,然後輪詢哪個管道的資料已經準備好,通知cpu執行IO的讀取或寫入操作。Selector模式:當IO事件(管道)註冊到選擇器以後,Selector會分配個每個管道一個key值,相當於標籤。Selector選擇器是以輪詢的方式進行查詢註冊所有IO事件(管道),當我們的IO事件(管道)準備就緒後,Selector就會識別,會通過key值來找到對應的管道,進行相關的資料處理操作(從管道里讀或寫資料,寫到我們的資料緩衝區中)。 每個管道都會對選擇器進行註冊不同的時間狀態,以便於選擇器查詢:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
/**
* 實現Runnable介面是為了註冊到Selector中一直處於輪詢的狀態
* */
public class Server implements Runnable {
//1.多路複用器(管理所有的通道)
private Selector selector;
//2.建立緩衝區
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//1.開啟多路複用器
this.selector = Selector.open();
//2.開啟服務端通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3.設定服務端通道為非阻塞模式
ssc.configureBlocking(false);
//4.繫結地址
ssc.bind(new InetSocketAddress(port));
//5.把服務端通道註冊到多路複用器上,並且監聽阻塞事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, prot: " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//1.讓多路複用器開始監聽
this.selector.select();
//2.返回多路複用器已經選擇的結果集
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
//3.進行響應
while(keys.hasNext()){
//4.獲取一個選擇元素
SelectionKey key = keys.next();
//5.直接從容器中移除
keys.remove();
//6.如果key有效
if(key.isValid()){
//7.如果為阻塞狀態
if(key.isAcceptable()){
this.accept(key);//這裡的key就是伺服器端的Channel的key
}
//8.如果為可讀狀態
if(key.isReadable()){
this.read(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key) {
try {
//1.清空緩衝區舊的資料
this.readBuf.clear();
//2.獲取之前註冊的socket通道物件
SocketChannel sc = (SocketChannel) key.channel();
//3.讀取資料
int count = sc.read(this.readBuf);
//4.如果沒有資料
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5.有資料則進行讀取,讀取之前需要進行復位方法(把position和limit進行復位)
this.readBuf.flip();
//6.根據緩衝區的資料長度建立相應大小的byte陣列,接收緩衝區的資料
byte[] bytes = new byte[this.readBuf.remaining()];
//7.接收緩衝區資料
this.readBuf.get(bytes);
//8.列印結果
String body = new String(bytes).trim();
System.out.println("Server: " + body);
//9.寫回給客戶端
//...
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1.獲取服務端通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2.執行客戶端Channel的阻塞方法
SocketChannel sc = ssc.accept();
//3.設定阻塞模式
sc.configureBlocking(false);
//4.註冊到多路複用器上,並設定讀取標識
sc.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();
}
}
public class Client {
public static void main(String[] args) {
//建立連線地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
//宣告連線通道
SocketChannel sc = null;
//建立緩衝區
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
//開啟通道
sc = SocketChannel.open();
//進行連線
sc.connect(address);
while(true){
//定義一個位元組陣列,然後使用系統錄入的功能
byte[] bytes = new byte[1024];
System.in.read(bytes);
//把資料放到緩衝區
buf.put(bytes);
//復位
buf.flip();
//寫出資料
sc.write(buf);
//清空緩衝區資料
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}