java-執行緒及NIO淺談
這裡簡單談一下執行緒,但是要把一個執行緒談好,要結合NIO,結合鎖機制一起學習,記憶才會深刻。所以,以下說明。結合這三個方面的執行緒和NIO進行談談,鎖部分另外再談。
1、程序
1.1概念
程序=程式+執行。當把一個程式從磁碟中載入到記憶體中,cpu去運算和處理這個程序(執行起來的程式就是程序)。
從三個維度來看程序的模型
維度 | 說明 |
---|---|
從記憶體維度 | 每個程序都獨佔一塊地址空間,cpu處理程序實際上就是處理這個程序記憶體的首地址到尾地址的資料庫資訊 |
從執行的邏輯維度 | 每一個程序都可以被CPU所處理和計算,此外,每一個程序也可以掛起,讓其他程序得以處理。在同一個時刻,只能有一個程序被cpu所處理。總結:程序模型,在巨集觀上是並行處理的,但是微觀上看,是“序列”處理的(單核)。如果是多核架構,巨集觀和微觀上都是並行處理的。 |
時間維度 | 每個程序執行一段時間之後,肯定都完成了一定的工作量。即程序是隨時間向前推進的。 |
1.2 為什麼會引入程序模型?
最開始的作業系統是單道程式設計(一個程式處理完,再處理下一個程式)
缺點:1、響應時間慢(客戶體驗差)2、cpu利用率非常低
例如:假設一個程序,20%需要做cpu運算,80%在做IO(傳送IO事件時,cpu時閒置的)=>cpu利用率是20%
單道程式設計模型:cup利用率=1-0.8=20%
多道程式設計模型下:
同時執行兩個程序的話:1-0.80.8=36%
同時執行三個程序的話:1-0.8
總結:引入程序模型,目的就是為了滿足多道程式設計,而多道程式設計的目的就是為了提高cpu的利用率。隨著程序數量的增加,cpu的利用率逐步提高。
1.3 程序產生和消亡的時間
產生 | 消亡 |
---|---|
1、作業系統會產生服務程序 | 1、程序的所有運算都處理完之後,自行退出。 |
2、父程序建立子程序使用者請求建立一個程序 | 2、程序在執行過程中產生錯誤或異常而強行退出。 |
3、使用者請求建立一個程序 | 3、一個程序被其他程序所殺死而退出。 |
程序的狀態
狀態 | 說明 |
---|---|
執行態 | 一個程序正在被cpu執行 |
掛起態 | 對於掛起態,要討論什麼原因導致的掛起:1、一個程序由於發生了某些阻塞操作,比如I/O事件,2、一個程序由於執行了很長時間,主動掛起,讓其他程序得以處理。3、使用者主動將程序掛起,比如sleep操作總結:針對第一類和第三類掛起的程序,即使把cpu讓給這個程序,cpu也處理不了;我們把這樣掛起的程序稱為:阻塞態程序。第二類程序,稱之為:就緒態程序。 |
所以細分程序的狀態:執行態|就緒態|阻塞態(計算機在做進行排程時,是不會排程和檢視阻塞態的程序的)
程序的狀態轉變
2、執行緒模型
2.1 概念
一個程序必須有一個執行緒,也可以有多個執行緒。
比如:拿word寫筆記,這個word程序會監聽打字;此外,每隔10分鐘(預設值)自動儲存一次。
總結:引入執行緒模型,目的就是為了讓程序可以同時做多件事(執行緒是程序的分身)。此外,引入執行緒模型後,cpu的最小執行單位就變成了執行緒。
最後總結:程序相當於資源組織單位,執行緒是cpu最小執行單位。
排程演算法
1、FCFS(First come first server )先來先服務排程演算法,處理先來的執行緒,執行緒處理完後再處理下一個來的執行緒。
2、時間片輪轉排程演算法。這個演算法的出現,解決了FCFS演算法的問題,有效縮短了響應時間,提高了cpu利用率。但是這個演算法可能存在的缺陷是:短任務處於飢餓狀態。
3、短任務優先排程演算法:優先去完成短任務。這個演算法的侷限性:可能會導致長任務時常處於飢餓狀態。
4、優先順序排程演算法:為執行緒分別分配優先順序,優先順序高的執行緒優先被處理。侷限性就是優先順序低的執行緒長期處於飢餓狀態。
5、混合排程演算法:這個演算法把之前演算法的優點進行結合,然後做執行緒排程
總結:
短任務優先排程演算法:更適合應用在大池子小佇列
大池子小佇列解釋:
- 沒有核心執行緒,臨時執行緒無限大(Integer的最大值),存活時間是60S,同步佇列(只能容納一個執行緒)
優點: - 適用於高訪問量、高併發的場景(短請求)。使用者請求不需要
- 等待排隊,能夠快速響應使用者請求
- 隱患:可能會出現執行緒的頻繁建立和銷燬。此外,如果使用者的請求都是長請求,
- 可能會發生記憶體溢位的場景
核心程式碼如下:
ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>())
時間片輪轉排程演算法,從小池子大佇列去談。
小池子大佇列解釋:
- 核心執行緒數=最大執行緒數,沒有臨時執行緒
- 阻塞佇列是無限的
- 優點:能夠降低伺服器的壓力。
- 缺點:使用者請求的響應時間較長。
核心程式碼如下:
ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
3、 NIO和BIO
3.1 NIO和BIO對比
NIO(non blocking I/O)非阻塞I/O,jdk1.4引入的新I/O
之前學習的I/O操作是BIO,即阻塞I/O
BIO | NIO |
---|---|
面向流的:InputStream(),OuputStream位元組輸入流,位元組輸出流Reader,Writer | 面向緩衝區的(Buffer)NIO是通過緩衝區去操作資料的,可以用緩衝區靈活操作資料。 |
字元輸入流,字元輸出流.特點:流是有方向性的、連續不斷的,總結侷限性:不能靈活的操作流裡的資料 | 此外,NIO是面向通道的(Channel),在通道上,即可以讀資料,也可以寫資料。總結:通道相當於提供了運算環境,緩衝區是運輸資料的載體。 |
阻塞的IO比如Socket,它的底層用的BIO機制,accept()、connect()、write()、read()呼叫時會產生阻塞。阻塞模型的侷限性:不可能應對高併發、高訪問量的場景。 | NIO是非阻塞的IO,可以利用NIO處理高併發和高訪問量的場景。 |
3.2 BIO API使用
具體流程:
A:測試accept()方法的阻塞
/**accept()方法:建立一個連線,這個方法會產生阻塞,直到連線建立之後,阻塞才會放開。
* Listens for a connection to be made to this socket and accepts
* it. The method blocks until a connection is made.
* @throws Exception
*/
@Test
public void testAccept() throws Exception{
ServerSocket ss = new ServerSocket();
//繫結埠
ss.bind(new InetSocketAddress(6666));
//獲取連線
Socket sk = ss.accept();
System.out.println("有客戶端接入");
}
JUnit測試,“有客戶端接入”沒有輸出,說明accept()方法產生阻塞了。
B:然後新增connect()方法測試的程式碼:
/**connect()方法:作用根據指定的ip地址和埠號建立連線
* 該方法會產生阻塞,直到連線建立之後阻塞才會被放開。
* Connects this socket to the server with a specified timeout value.
* A timeout of zero is interpreted as an infinite timeout. The connection
* will then block until established or an error occurs.
* @throws Exception
*/
@Test
public void connect() throws Exception{
Socket sk = new Socket();
//建立連線
sk.connect(new InetSocketAddress("127.0.0.1", 6666));
System.out.println("連線建立");
先執行伺服器端方法(testAccept()),再執行客戶端方法,發現accept()方法阻塞釋放了。另外“連線建立”正確輸出。如果不先啟動伺服器端方法,而直接執行客戶端方法,發現先是阻塞了一下,然後JUnit測試丟擲異常。
總結:connect()方法會產生阻塞,直到連線成功,阻塞才釋放。
accept()方法產生的阻塞,直到伺服器獲得連線後,阻塞才釋放。
C:測試read()方法的阻塞性
C1: 再次修改testAccept()方法
//獲取連線
Socket sk = ss.accept();
System.out.println("有客戶端接入");
InputStream in = sk.getInputStream();
in.read();
System.out.println("讀取到了內容");
先執行伺服器的方法testAccept方法,在執行connect(),發現有客戶端連入輸出了,但是讀取到了內容並沒有顯示出來。說明read()方法也會產生阻塞。片刻後connect()快速執行完畢,socket連線斷開,接著testAccept()結束了。
C2:為了不讓連線中斷,需要修改testConnect()
System.out.println("連線建立");
while(true);
總結:read()方法會產生阻塞,直到讀取到內容後,阻塞才被釋放。
D:測試write()方法的阻塞性
D1:修改testAccept()方法
OutputStream out = sk.getOutputStream();
for(int i =1;i<200000;i++){
//65513*10
out.write("helloworld".getBytes());
System.out.println(i);
}
System.out.println("寫出了資料");
先執行伺服器端方法,再執行客戶端方法;發現i輸出值為65513,阻塞了。
for(int i =1;i<200000;i++){
//65513*10 ~ 131026*5
out.write("hello".getBytes());
System.out.println(i);
}
微調程式碼,輸出到131026阻塞了。
總結:write()方法也會產生阻塞,write()一直往出寫資料,但是沒有任何一方讀取資料,直到寫出到一定量(我的是655130B,不同電腦可能不同)的時候,產生阻塞。向網絡卡裝置緩衝區中寫資料。
3.3 NIO 相關API
Channel檢視API:
ServerSocketChannel, SocketChannel基於NIO的(基於tcp實現的,安全的基於握手機制)
DatagramChannel基於UDP協議,不安全
NIO-Channel API(緩衝通道)
accept和connect使用
/**NIO提供兩種模式:阻塞模式和非阻塞模式;預設是阻塞模式。
* 執行程式,“有客戶端連入了”並沒有輸出,說明NIO預設是阻塞模式。
* @throws Exception
*/
@Test
public void testAccept() throws Exception{
//建立ServerSocketChannel物件
ServerSocketChannel ssc = ServerSocketChannel.open();
//繫結監聽的埠
ssc.socket().bind(new InetSocketAddress(5555));
//獲取socketChannel連線
SocketChannel socketChannel = ssc.accept();
System.out.println("有客戶端連入了");
}
執行發現,並沒有輸出“有客戶端連線接入”,通道提供阻塞和非阻塞兩種模式,預設為阻塞模式。
可以在bind port之前新增ssc.configureBlocking(false);
設定通道的非阻塞模式。再次執行“有客戶端連線接入”便輸出了。
/**NIO提供兩種模式:阻塞模式和非阻塞模式;預設是阻塞模式。
* 執行程式,“有客戶端連入了”並沒有輸出,說明NIO預設是阻塞模式。
* @throws Exception
*/
@Test
public void testAccept() throws Exception{
//建立ServerSocketChannel物件
ServerSocketChannel ssc = ServerSocketChannel.open();
//設定為非阻塞模式
ssc.configureBlocking(false);
//繫結監聽的埠
ssc.socket().bind(new InetSocketAddress(5555));
//獲取socketChannel連線
SocketChannel socketChannel = ssc.accept();
System.out.println("有客戶端連入了");
}
/**connect()建立連線的方法,預設也會產生阻塞。
* @throws Exception
*/
@Test
public void connect() throws Exception{
//建立SocketChannel物件
SocketChannel sc = SocketChannel.open();
//建立連線
sc.connect(new InetSocketAddress("127.0.0.1", 5555));
System.out.println("連線成功");
}
**加sc.configureBlocking(false);**之前,執行該方法顯示阻塞一下然後丟擲異常,並沒有輸出“連線成功”,通道的connect()方法也是阻塞的;**使用方法sc.configureBlocking(false);**可以將客戶端連線通道設定為非阻塞模式。
accept和connect總結
/**NIO提供兩種模式:阻塞模式和非阻塞模式;預設是阻塞模式。
* 執行程式,“有客戶端連入了”並沒有輸出,說明NIO預設是阻塞模式。
* @throws Exception
*/
@Test
public void testAccept() throws Exception{
//建立ServerSocketChannel物件
ServerSocketChannel ssc = ServerSocketChannel.open();
//設定為非阻塞模式
ssc.configureBlocking(false);
//繫結監聽的埠
ssc.socket().bind(new InetSocketAddress(5555));
//獲取socketChannel連線
SocketChannel socketChannel = ssc.accept();
System.out.println("有客戶端連入了");
}
/**connect()建立連線的方法,預設也會產生阻塞。
* 設定為非阻塞模式的話,需要在呼叫connect方法之前,使用
* configureBlocking(false)將模式設定為非阻塞模式。
* @throws Exception
*/
@Test
public void connect() throws Exception{
//建立SocketChannel物件
SocketChannel sc = SocketChannel.open();
//設定阻塞模式為非阻塞
sc.configureBlocking(false);
//建立連線
sc.connect(new InetSocketAddress("127.0.0.1", 5555));
System.out.println("連線成功");
}
注意的是,需要在伺服器和客戶端分別設定阻塞模式。
3.4 read()、write()方法測試(過度)
sc.read(ByteBuffer dst)
sc.write(ByteBuffer src)
由於這兩個方法都需要ByteBuffer物件作為引數,所以我們需要先講ByteBuffer緩衝區。
3.4.1 NIO-ByteBuffer緩衝區API
/**ByteBuffer位元組快取區
* 1)三個重要的屬性:
* capacity:快取的容量
* limit:快取區的限制(position<limit)
* position:快取區的當前位置
* 2)重要的基本方法
* allocate(int cap):新建一個指定容量的快取區。
* flip():反轉快取區,將limit設定為position的值,在將position=0.
* hasRemaining():判斷position和limit限制之間是否還有元素。
* wrap("**".getBytes()):根據字串的內容建立“對應”的快取區。
*
* put(byte bt):向緩衝區中新增元素。執行一次position++
* get():從快取區中獲取元素。執行一次position++
*
* limit():獲取limit的值
* limit(int newlt):設定limit()的值
* position():獲取position的值
* position(int newPs):設定position的值
* @author jinxf
*
*/
public class ByteBufferDemo {
/**ByteBuffer位元組快取區
* 該類有三個重要的屬性:
* capacity:新緩衝區的容量,以位元組為單位 ,指定快取區的長度
* limit:緩衝區的限制(限定獲取元素的最大下標)
* position:緩衝區的位置
*/
@Test
public void bbInfo(){
ByteBuffer buf =ByteBuffer.allocate(10);
System.out.println();
}
/**向緩衝區中新增內容。
* 可以新增位元組內容;
* 也可以其他的基本資料型別,一般不建議使用。
* 每新增一個位元組後,position++。
* 其他兩個屬性capacity和limit不會發生改變。
*/
@Test
public void testPut(){
ByteBuffer buf = ByteBuffer.allocate(10);
Byte b1 = 1;
Byte b2 = 3;
buf.put(b1);
buf.put(b2);
//buf.putInt(10);
System.out.println();
}
/**get():從當前position值對應的位置獲取元素,每
* 獲取一個元素值後position++;
* buf.position()獲取postion值
* buf.position(int newPs):設定postion的值
* limit()獲取限定值
* limit(int newlt):設定限定的值。
* BufferUnderflowException:通過get()從緩衝區中取元素內容
* 時position>=limit
*/
@Test
public void testGet(){
ByteBuffer buf = ByteBuffer.allocate(10);
Byte b1 = 1;
Byte b2 = 3;
buf.put(b1);
buf.put(b2);
//獲取當前的位置的值buf.position();
//設定當前位置的值
buf.position(0);
//獲取limit的值:buf.limit();
buf.limit(2);
System.out.println(buf.get());
System.out.println(buf.get());
System.out.println(buf.get());
}
/*flip():反轉緩衝區的方法
* 作用是:
* 將limit的值設定為position
* 將position值設定為0
*/
@Test
public void testFilp() {
ByteBuffer buf = ByteBuffer.allocate(10);
Byte b1 = 1;
Byte b2 = 3;
buf.put(b1);
buf.put(b2);
Byte b3 = 4;
buf.put(b3);
buf.flip();
/*System.out.println(buf.get());
System.out.println(buf.get());
System.out.println(buf.get());
//當執行以下程式碼時丟擲異常
System.out.println(buf.get());*/
for(int i=0;i<buf.limit();i++){
System.out.println(buf.get());
}
}
/**hasRemaining()檢查position和limit之間是否還有元素。
* 本質是判斷position是否小於limit
*/
@Test
public void hasRemaining(){
ByteBuffer buf = ByteBuffer.allocate(10);
Byte b1 = 1;
Byte b2 = 3;
buf.put(b1);
buf.put(b2);
Byte b3 = 4;
buf.put(b3);
buf.flip();
while(buf.hasRemaining()){
System.out.println(buf.get());
}
}
@Test
public void testWrap() throws Exception{
/*ByteBuffer buf = ByteBuffer.wrap("HelloWorld".getBytes());*/
ByteBuffer buf = ByteBuffer.wrap("中國".getBytes("UTF-8"));
while(buf.hasRemaining()){
System.out.print(buf.get());
}
}
}
1、read()方法
修改NIODemo類的testAccept方法:
System.out.println("有客戶端連入了");
//讀取緩衝區中的內容
ByteBuffer buf = ByteBuffer.allocate(10);
//將讀取的內容儲存到緩衝區中
socketChannel.read(buf);
System.out.println("讀取到內容"+new String(buf.array()));
testConnect()方法不做任何修改,先執行testAccept()方法,發現在sc.read(buf)行丟擲了空指標異常。buf物件不可能為null,所以socketChannel為null.
非阻塞程式設計最大的問題:不知道是否真正的有客戶端接入,所以容易產生空指標;所以需要人為設定阻塞。
將**SocketChannel sc = ssc.accept();**改為:
//獲取socketChannel連線
SocketChannel socketChannel = null;
while(socketChannel==null){
socketChannel = ssc.accept();
}
再次執行testAccept()方法,空指標的問題解決了;然後再執行testConnect()方法,發現連線能夠正常建立,但是“讀取到內容**”並沒有輸出。
修改connect()方法:
while(true);
再次執行testAccept(),再執行connect(),”有客戶端連入了”輸出了,但是” 讀取到內容**”並沒有輸出。說明即使ssc服務通道設定了非阻塞,也沒有改變得到的通道socketChannel預設為阻塞模式,所以socketChannel.read(buf)阻塞了。要不想讓read()方法阻塞,需要在呼叫read()之前加
socketChannel.configureBlocking(false);
這樣即使沒有讀到資料,“讀取到內容”也能打印出來。
write()方法
修改contect()方法,追加以下程式碼:
ByteBuffer buf = ByteBuffer.wrap("helloworld".getBytes());
sc.write(buf);
while(true);
測試bug,先不執行伺服器端方法,直接執行客戶端方法connect(),輸出“連線成功”,但是sc.write(buf)行丟擲NotYetConnectedException異常。sc為何會丟擲該異常呢?非阻塞模式很坑的地方在於不知道連線是否真正的建立。修改connect():
while(!sc.isConnected()){
sc.finishConnect();
}
System.out.println("連線成功");
再次執行testConnect(),空指標解決了,但是有出現了新的異常:
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
先啟動伺服器端(testAccept()),後啟動客戶端(connect())即可。
手寫NIO非阻塞模式難度較大,程式碼不是重點,重要在於引出設計思想。
public class NIODemo {
/**NIO提供兩種模式:阻塞模式和非阻塞模式;預設是阻塞模式。
* 執行程式,“有客戶端連入了”並沒有輸出,說明NIO預設是阻塞模式。
* @throws Exception
*/
@Test
public void testAccept() throws Exception{
//建立ServerSocketChannel物件
ServerSocketChannel ssc = ServerSocketChannel.open();
//設定為非阻塞模式
ssc.configureBlocking(false);
//繫結監聽的埠
ssc.socket().bind(new InetSocketAddress(5555));
//獲取socketChannel連線
SocketChannel socketChannel = null;
while(socketChannel==null){
socketChannel = ssc.accept();
}
System.out.println("有客戶端連入了");
//設定當前socketChannel設定為非阻塞
socketChannel.configureBlocking(false);
//讀取緩衝區中的內容
ByteBuffer buf = ByteBuffer.allocate(10);
//將讀取的內容儲存到緩衝區中
socketChannel.read(buf);
System.out.println("讀取到內容"+new String(buf.array()));
}
/**connect()建立連線的方法,預設也會產生阻塞。
* 設定為非阻塞模式的話,需要在呼叫connect方法之前,使用
* configureBlocking(false)將模式設定為非阻塞模式。
* @throws Exception
*/
@Test
public void connect() throws Exception{
//建立SocketChannel物件
SocketChannel sc = SocketChannel.open();
//設定阻塞模式為非阻塞
sc.configureBlocking(false);
//建立連線
sc.connect(new InetSocketAddress("127.0.0.1", 5555));
while(!sc.isConnected()){
sc.finishConnect();
}
System.out.println("連線成功");
ByteBuffer buf = ByteBuffer.wrap("helloworld".getBytes());
sc.write(buf);
System.out.println("寫出資料");
while(true);
}
}
3.5 Selector設計思想
3.5.1問題的引入
使用BIO編寫程式碼模擬一下
(編寫一個伺服器端和客戶端程式,執行一次伺服器程式,執行四次客戶端程式模擬四個使用者執行緒)
public class Server {
public static void main(String[] args) throws Exception {
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress(7777));
while(true){
Socket sk = ss.accept();
new Thread(new ClientRunner(sk)).start();
}
}
}
class ClientRunner implements Runnable{
Socket sk = null;
public ClientRunner(Socket sk){
this.sk = sk;
}
public void run() {
System.out.println("負責為客戶端提供服務,當前執行緒的id:"+Thread.currentThread().getId());
}
}
public class Client {
public static void main(String[] args) throws Exception {
Socket sk = new Socket();
sk.connect(new InetSocketAddress("127.0.0.1", 7777));
//System.out.println("連線建立");
while(true);
}
}
執行結果:
伺服器啟動
負責為客戶端提供服務,當前執行緒的id:9
負責為客戶端提供服務,當前執行緒的id:10
負責為客戶端提供服務,當前執行緒的id:11
負責為客戶端提供服務,當前執行緒的id:12
分析該模式的缺點:
缺點1:每有一個使用者請求,就會建立一個新的執行緒為之提供服務。當用戶請求量特別巨大,執行緒數量就會隨之增大,繼而記憶體的佔用增大,所以不適用於高併發、高訪問的場景。
缺點2:執行緒特別多,不僅佔用記憶體開銷,也會佔用大量的cpu開銷,因為cpu要做執行緒排程。
缺點3:如果一個使用者僅僅是連入操作,並且長時間不做其他操作,會產生大量閒置執行緒。會使cpu做無意義的空轉,降低整體效能。
缺點4:這個模型會導致真正需要被處理的執行緒(使用者請求)不能被及時處理。
3.5.2 解決方法
針對缺點3和缺點4,可以將閒置的執行緒設定為阻塞態,cpu是不會排程阻塞態的執行緒,避免了cpu的空轉。所以引入事件監聽機制實現。
Selector多路複用選擇器,起到事件監聽的作用。
監聽哪個使用者執行操作,就喚醒對應的執行緒執行。那麼都有哪些事件呢?
事件:1.accept事件、2.connect事件、3.read事件、4.write事件
針對缺點1和缺點2,可以利用非阻塞模型來實現,利用少量執行緒甚至一個執行緒來處理多使用者請求。但是注意,這個模型是有使用場景的,適用於大量短請求場景。(比如使用者訪問電商網站),不適合長請求場景(比如下載大檔案,這種場景,NIO不見得比BIO好)
擴充套件知識
驚群現象,隱患:cpu的負載會在短時間之內聚升,最嚴重的情況時出現短暫卡頓甚至宕機。第二個問題就是效能不高。
3.6 Selector服務通道API
accept事件
編寫伺服器端程式:
public class Server {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 9999));
//建立多路複用選擇器
Selector selector = Selector.open();
//在scc上註冊accpet事件,並指定由selector負責處理這些事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
//select()會產生阻塞,直到監聽到事件觸發阻塞的釋放
selector.select();
//獲取事件的集合物件
Set<SelectionKey> keys = selector.selectedKeys();
//獲取迭代器
Iterator<SelectionKey> iter = keys.iterator();
//遍歷迭代器
while(iter.hasNext()){
//sk就是一個封裝了事件資訊、通道資訊
SelectionKey sk = iter.next();
//當前事件是否為accept事件
if(sk.isAcceptable()){
//注意:要使用sk中封裝的ServerSocketChannel
ServerSocketChannel ss = (ServerSocketChannel)sk.channel();
//設定非阻塞
ss.configureBlocking(false);
SocketChannel sc = ss.accept();
sc.configureBlocking(false);
System.out.println("有客戶端連入,負責處理該請求的執行緒id:"+Thread.currentThread().getId());
//將該通道上的accpet事件
//OP_ACCEPT:1 << 4 0001 0000
//OP_READ:1<<0 0000 0001
//OP_WRITE = 1 << 2 0000 0100
//OP_CONNECT = 1 << 3 0000 1000
sc.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if(sk.isReadable()){
}
if(sk.isWritable()){
}
//勿忘我:將該事件物件移除
iter.remove();
}
}
}
}
編寫客戶端程式碼:
public class Client {
public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9999));
//System.out.println("連線建立");
while(true);
}
}
伺服器端啟動一次,客戶端啟動三次,伺服器端的控制檯輸出:
伺服器端啟動
有客戶端連入,負責處理該請求的執行緒id:1
有客戶端連入,負責處理該請求的執行緒id:1
有客戶端連入,負責處理該請求的執行緒id:1
處理多個請求使用同一個執行緒。
3.6.2 read事件
修改Server類
if(sk.isReadable()){
SocketChannel sc = (SocketChannel)sk.channel();
ByteBuffer buf = ByteBuffer.allocate(10);
sc.read(buf);
System.out.println("伺服器端讀到了資料:"+new String(buf.array()));
//去掉讀事件
//sc.register(selector,SelectionKey.OP_WRITE);
//0000 0101 sk.interestOps():原來的事件
//1111 1110 ~SelectionKey.OP_READ
//0000 0100
sc.register(selector, sk.interestOps()&~SelectionKey.OP_READ);
}
修改Client類
while(!sc.isConnected()){
sc.finishConnect();
}
ByteBuffer buf = ByteBuffer.wrap("helloworld".getBytes());
sc.write(buf);
3.6.3 write事件
修改Servet
if(sk.isWritable()){
SocketChannel sc = (SocketChannel)sk.channel();
ByteBuffer buf =ByteBuffer.wrap("收到".getBytes("UTF-8"));
sc.write(buf);
System.out.println("伺服器寫出了資料");
//去掉寫事件
sc.register(selector, sk.interestOps()&~SelectionKey.OP_WRITE);
}
修改Client類
ByteBuffer buf2 = ByteBuffer.allocate(6);
sc.read(buf2);
System.out.println("客戶端接收的內容:"+new String(buf2.array(),"UTF-8"));
public class Client2 {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9999));
//對於客戶端,最開始要註冊連線監聽
Selector selector = Selector.open();
sc.register(selector, SelectionKey.OP_CONNECT);
while(true){
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iter = set.iterator();
while(iter.hasNext()){
SelectionKey sk = iter.next();
if(sk.isConnectable()){
}
if(sk.isWritable()){
}
if(sk.isReadable()){
}
iter.remove();
}
}
}
}