Java執行緒間通訊與訊號量
阿新 • • 發佈:2019-02-14
1. 訊號量Semaphore
先說說Semaphore,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。一般用於
控制併發執行緒數,及執行緒間互斥
。另外重入鎖 ReentrantLock 也可以實現該功能,但實現上要複雜些。
功能就類似廁所有5個坑,假如有10個人要上廁所,那麼同時只能有多少個人去上廁所呢?同時只能有5個人能夠佔用,當5個人中 的任何一個人讓開後,其中等待的另外5個人中又有一個人可以佔用了。另外等待的5個人中可以是隨機獲得優先機會,也可以是按照先來後到的順序獲得機會。
單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合。
例子:
/** * @Description: * @param @param args * @return void 返回型別 */ public static void main(String[] args) { // 執行緒池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5個執行緒同時訪問 final Semaphore semp = new Semaphore(5); // 模擬20個客戶端訪問 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 獲取許可 semp.acquire(); System.out.println("獲得Accessing: " + NO); Thread.sleep((long) (Math.random() * 10000)); // 訪問完後,釋放 semp.release(); System.out.println("剩餘可用訊號-----------------" + semp.availablePermits()); } catch (InterruptedException e) { e.printStackTrace(); } } }; exec.execute(run); } // 退出執行緒池 exec.shutdown(); }
輸出結果(可以想想為什麼會這樣輸出):
獲得Accessing: 1 獲得Accessing: 5 獲得Accessing: 2 獲得Accessing: 3 獲得Accessing: 0 剩餘可用訊號-----------------1 獲得Accessing: 4 剩餘可用訊號-----------------1 獲得Accessing: 9 剩餘可用訊號-----------------1 獲得Accessing: 8 剩餘可用訊號-----------------1 獲得Accessing: 6 剩餘可用訊號-----------------1 獲得Accessing: 10 剩餘可用訊號-----------------1 獲得Accessing: 11 剩餘可用訊號-----------------1 獲得Accessing: 12 剩餘可用訊號-----------------1 獲得Accessing: 13 剩餘可用訊號-----------------1 獲得Accessing: 7 剩餘可用訊號-----------------1 獲得Accessing: 15 剩餘可用訊號-----------------1 獲得Accessing: 16 剩餘可用訊號-----------------1 獲得Accessing: 17 剩餘可用訊號-----------------1 獲得Accessing: 14 剩餘可用訊號-----------------1 獲得Accessing: 18 剩餘可用訊號-----------------1 獲得Accessing: 19 剩餘可用訊號-----------------1 剩餘可用訊號-----------------2 剩餘可用訊號-----------------3 剩餘可用訊號-----------------4 剩餘可用訊號-----------------5
2. 使用PIPE作為執行緒間通訊橋樑
Pipe有一個source通道和一個sink通道。資料會被寫到sink通道,從source通道讀取。一進一出。先作為初步瞭解怎麼使用。
值得注意的是該類在java.nio.channels下,說明該類屬於nio方式的資料通訊方式,那就使用Buffer來緩衝資料。
Pipe原理的圖示:
- Pipe就是個空管子,這個空管子一頭可以從管子裡往外讀,一頭可以往管子裡寫
- 操作流程:
- 1.首先要有一個物件往這個空管子裡面寫。寫到哪裡呢?這個空管子是有一點空間的,就在這個管子裡。
寫的時候就是寫到管子本身包含的這段空間裡的。這段空間大小是1024個位元組。 - 2.然後另一個物件才能將這個裝滿了的管子裡的內容讀出來。
- 1.首先要有一個物件往這個空管子裡面寫。寫到哪裡呢?這個空管子是有一點空間的,就在這個管子裡。
上程式碼
package com.jx.test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
public class testPipe {
/**
* @Description:
* @param @param args
* @return void 返回型別
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// 建立一個管道
Pipe pipe = Pipe.open();
final Pipe.SinkChannel psic = pipe.sink();// 要向管道寫資料,需要訪問sink通道
final Pipe.SourceChannel psoc = pipe.source();// 從讀取管道的資料,需要訪問source通道
Thread tPwriter = new Thread() {
public void run() {
try {
System.out.println("send.....");
// 建立一個執行緒,利用管道的寫入口Pipe.SinkChannel型別的psic往管道里寫入指定ByteBuffer的內容
int res = psic.write(ByteBuffer
.wrap("Hello,Pipe!測試通訊.....".getBytes("utf-16BE")));
System.out.println("send size:" + res);
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread tPreader = new Thread() {
public void run() {
int bbufferSize = 1024 * 2;
ByteBuffer bbuffer = ByteBuffer.allocate(bbufferSize);
try {
System.out.println("recive.....");
// 建立一個執行緒,利用管道的讀入口Pipe.SourceChannel型別的psoc將管道里內容讀到指定的ByteBuffer中
int res = psoc.read(bbuffer);//資料未
System.out.println("recive size:"+res+" Content:" + ByteBufferToString(bbuffer));
} catch (Exception e) {
e.printStackTrace();
}
}
};
tPwriter.start();
tPreader.start();
}
/**
*ByteBuffer--> String的轉換函式
*/
public static String ByteBufferToString(ByteBuffer content) {
if (content == null || content.limit() <= 0
|| (content.limit() == content.remaining())) {
System.out.println("不存在或內容為空!");
return null;
}
int contentSize = content.limit() - content.remaining();
StringBuffer resultStr = new StringBuffer();
for (int i = 0; i < contentSize; i += 2) {
resultStr.append(content.getChar(i));
}
return resultStr.toString();
}
}