NIO學習-04網路通訊
網路通訊:
傳統的 IO 流都是阻塞式的。也就是說,當一個執行緒呼叫 read() 或 write() 時,該執行緒被阻塞,直到有一些資料被讀取或寫入,該執行緒在此期間不能執行其他任務。因此,在完成網路通訊進行 IO 操作時,由於執行緒會阻塞,所以伺服器端必須為每個客戶端都提供一個獨立的執行緒進行處理,當伺服器端需要處理大量客戶端時,效能急劇下降。例如tomcat低版本的BIO模式就是傳統的IO流網路通訊。
Java NIO 是使用Java的非同步IO技術,不產生阻塞。當執行緒從某通道進行讀寫資料時,若沒有資料可用時,該執行緒可以進行其他任務。執行緒通常將非阻塞 IO 的空閒時間用於在其他通道上執行 IO 操作
NIO 完成網路通訊的三個核心:
1. 通道(Channel):負責連線
java.nio.channels.Channel 介面:
|--SelectableChannel
|--SocketChannel
|--ServerSocketChannel
|--DatagramChannel
|--Pipe.SinkChannel
|--Pipe.SourceChannel
2. 緩衝區(Buffer):負責資料的存取
3. 選擇器(Selector):是 SelectableChannel 的多路複用器
選擇器(Selector)的應用:
建立 Selector :通過呼叫 Selector.open() 方法建立一個 Selector。
向選擇器註冊通道:SelectableChannel.register(Selector sel, int ops)
ops事件型別: SelectionKey 的四個常量表示
- 讀 : SelectionKey.OP_READ(1)
- 寫 : SelectionKey.OP_WRITE(4)
- 連線 : SelectionKey.OP_CONNECT(8)
- 接收 : SelectionKey.OP_ACCEPT(16)
若註冊時不止監聽一個事件,則可以使用“位或”操作符連:
sChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
SelectionKey:表示 SelectableChannel 和 Selector 之間的註冊關係。每次向選擇器註冊通道時就會選擇一個事件(選擇鍵)。選擇鍵包含兩個表示為整數值的操作集。操作集的每一位都表示該鍵的通道所支援的一類可選擇操作.
相關API:
int interestOps() 獲取感興趣事件集合
int readyOps() 獲取通道已經準備就緒的操作的集合
SelectableChannel channel() 獲取註冊通道
Selector selector() 返回選擇器
boolean isReadable() 檢測 Channal 中讀事件是否就緒
boolean isWritable() 檢測 Channal 中
Selector 的常用方法:
Set<SelectionKey> keys() 所有的 SelectionKey 集合。代表註冊在該Selector上的Channel
selectedKeys() 被選擇的 SelectionKey 集合。返回此Selector的已選擇鍵集
int select() 監控所有註冊的Channel,當它們中間有需要處理的 IO 操作時,該方法返回,並將對應得的 SelectionKey 加入被選擇的SelectionKey 集合中,該方法返回這些 Channel 的數量.
int select(long timeout) 可以設定超時時長的 select() 操作
int selectNow() 執行一個立即返回的 select() 操作,該方法不會阻塞執行緒
Selector wakeup() 使一個還未返回的 select() 方法立即返回
void close() 關閉該選擇器
管道 (Pipe):
Java NIO 管道是2個執行緒之間的單向資料連線。Pipe有一個source通道和一個sink通道。資料會被寫到sink通道,從source通道讀取.
阻塞與非阻塞案例分析:
阻塞是:前面一個客戶端沒有執行完,後面的客戶端傳送的訊息無法顯示,因為伺服器還沒有接受完前一個客戶端傳送的訊息,因而後面的客戶端只能處於等待顯示狀態。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import org.junit.Test;
/**
*TCP
*/
public class TestBlockingNIO2 {
//客戶端
@Test
public void client() throws IOException{
//1. 獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
//2. 分配指定大小的緩衝區
ByteBuffer buf = ByteBuffer.allocate(1024);
//3. 讀取本地檔案,併發送到服務端
while(inChannel.read(buf) != -1){
buf.flip();
sChannel.write(buf);
buf.clear();
}
//通道停止輸出
sChannel.shutdownOutput();
//接收服務端的反饋
int len = 0;
while((len = sChannel.read(buf)) != -1){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
//4. 關閉通道
inChannel.close();
sChannel.close();
}
//服務端
@Test
public void server() throws IOException{
//1. 獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//2. 繫結連線
ssChannel.bind(new InetSocketAddress(9898));
//3. 獲取客戶端連線的通道
SocketChannel sChannel = ssChannel.accept();
//4. 分配指定大小的緩衝區
ByteBuffer buf = ByteBuffer.allocate(1024);
//5. 接收客戶端的資料,並儲存到本地
while(sChannel.read(buf) != -1){
buf.flip();//實質是令limit=position;position=0
outChannel.write(buf);
buf.clear();
}
//傳送反饋給客戶端
buf.put("服務端接收資料成功".getBytes());
buf.flip();
sChannel.write(buf);
sChannel.close();
outChannel.close();
ssChannel.close();
}
}
非阻塞:
tcp:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;
import org.junit.Test;
public class TestNonBlockingNIO {
//客戶端
@Test
public void client() throws IOException{
//1. 獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
//2. 切換非阻塞模式
sChannel.configureBlocking(false);
//3. 分配指定大小的緩衝區
ByteBuffer buf = ByteBuffer.allocate(1024);
//4. 傳送資料給服務端
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
//5. 關閉通道
sChannel.close();
}
//服務端
@Test
public void server() throws IOException{
//1. 獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 切換非阻塞模式
ssChannel.configureBlocking(false);
//3. 繫結連線
ssChannel.bind(new InetSocketAddress(9898));
//4. 獲取選擇器
Selector selector = Selector.open();
//5. 將通道註冊到選擇器上, 並且指定“監聽接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 輪詢式的獲取選擇器上已經“準備就緒”的事件
while(selector.select() > 0){
//7. 獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)”
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
//8. 獲取準備“就緒”的是事件
SelectionKey sk = it.next();
//9. 判斷具體是什麼事件準備就緒
if(sk.isAcceptable()){
//10. 若“接收就緒”,獲取客戶端連線
SocketChannel sChannel = ssChannel.accept();
//11. 切換非阻塞模式
sChannel.configureBlocking(false);
//12. 將該通道註冊到選擇器上
sChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}else if(sk.isReadable()){
//13. 獲取當前選擇器上“讀就緒”狀態的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
//14. 讀取資料
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while((len = sChannel.read(buf)) > 0 ){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}
//15. 取消選擇鍵 SelectionKey
it.remove();
}
}
}
}
UDP:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;
import org.junit.Test;
/**
*
* UDP 客戶端讀 服務端 寫
*/
public class TestNonBlockingNIO2 {
/**
*服務端
*/
@Test
public void send() throws IOException{
DatagramChannel dc = DatagramChannel.open();
//切換非阻塞模式
dc.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + ":\n" + str).getBytes());
buf.flip();
dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
buf.clear();
}
dc.close();
}
/**
*客戶端
*/
@Test
public void receive() throws IOException{
DatagramChannel dc = DatagramChannel.open();
//切換非阻塞模式
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
//獲取選擇器
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
// 所有的選擇器選擇物件
while(selector.select() > 0){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
//讀就緒
if(sk.isReadable()){
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf);
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
}
管道:
@Test
public void test1() throws IOException{
//1. 獲取管道
Pipe pipe = Pipe.open();
//2. 將緩衝區中的資料寫入管道
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通過單向管道傳送資料".getBytes());
buf.flip();
sinkChannel.write(buf);
//3. 讀取緩衝區中的資料
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();
}