Java高併發程式設計:同步工具類
內容摘要
這裡主要介紹了java5中執行緒鎖技術以外的其他同步工具,首先介紹Semaphore:一個計數訊號量。用於控制同時訪問資源的執行緒個數,CyclicBarrier同步輔助類:從字面意思看是路障,這裡用於執行緒之間的相互等待,到達某點後,繼續向下執行。CountDownLatch同步輔助類:在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。猶如倒計時計數器,然後是Exchanger:實現兩個物件之間資料交換,可阻塞佇列:ArrayBlockingQueue,通過阻塞佇列間的通訊來演示其作用,最後介紹了幾個同步集合。
1. Semaphore實現訊號燈
Semaphore可以維護當前訪問自身的執行緒個數,並提供了同步機制,使用Semaphore可以控制同時訪問資源的執行緒個數,例如,實現一個檔案允許的併發訪問數。Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。
Semaphore實現的功能就像:銀行辦理業務,一共有5個視窗,但一共有10個客戶,一次性最多有5個客戶可以進行辦理,其他的人必須等候,當5個客戶中的任何一個離開後,在等待的客戶中有一個人可以進行業務辦理。
Semaphore提供了兩種規則:
- 一種是公平的:獲得資源的先後,按照排隊的先後。在建構函式中設定true實現
- 一種是野蠻的:誰有本事搶到資源,誰就可以獲得資源的使用權。
與傳統的互斥鎖的異同:
單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖“,再由另外一個執行緒釋放”鎖“,這可以應用於死鎖恢復的一些場合。
應用場景:共享資源的爭奪,例如遊戲中選手進入房間的情況。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
//建立一個可根據需要建立新執行緒的執行緒池
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
//建立10個執行緒
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire(); //獲取燈,即許可權
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("執行緒" + Thread.currentThread().getName() +
"進入,當前已有" + (3-sp.availablePermits()) + "個併發");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將離開");
sp.release(); // 釋放一個許可,將其返回給訊號量
//下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元
System.out.println("執行緒" + Thread.currentThread().getName() +
"已離開,當前已有" + (3-sp.availablePermits()) + "個併發");
}
};
service.execute(runnable);
}
}
}
輸出結果
執行緒pool-1-thread-3進入,當前已有3個併發
執行緒pool-1-thread-2進入,當前已有3個併發
執行緒pool-1-thread-1進入,當前已有3個併發
執行緒pool-1-thread-2即將離開
執行緒pool-1-thread-2已離開,當前已有2個併發
執行緒pool-1-thread-5進入,當前已有3個併發
執行緒pool-1-thread-1即將離開
執行緒pool-1-thread-1已離開,當前已有2個併發
執行緒pool-1-thread-4進入,當前已有3個併發
執行緒pool-1-thread-4即將離開
執行緒pool-1-thread-4已離開,當前已有2個併發
執行緒pool-1-thread-8進入,當前已有3個併發
執行緒pool-1-thread-3即將離開
執行緒pool-1-thread-7進入,當前已有3個併發
執行緒pool-1-thread-3已離開,當前已有3個併發
執行緒pool-1-thread-8即將離開
執行緒pool-1-thread-8已離開,當前已有2個併發
執行緒pool-1-thread-9進入,當前已有3個併發
執行緒pool-1-thread-7即將離開
執行緒pool-1-thread-7已離開,當前已有2個併發
執行緒pool-1-thread-6進入,當前已有3個併發
執行緒pool-1-thread-9即將離開
執行緒pool-1-thread-9已離開,當前已有2個併發
執行緒pool-1-thread-10進入,當前已有3個併發
執行緒pool-1-thread-5即將離開
執行緒pool-1-thread-5已離開,當前已有2個併發
執行緒pool-1-thread-6即將離開
執行緒pool-1-thread-6已離開,當前已有1個併發
執行緒pool-1-thread-10即將離開
執行緒pool-1-thread-10已離開,當前已有0個併發
控制一個方法的併發量,比如同時只能有3個執行緒進來
public class ThreadPoolTest {
//訊號量
private static Semaphore semaphore = new Semaphore(3);//允許個數,相當於放了3把鎖
public static void main(String[] args) {
for(int i=0;i<10;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
method();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
//同時最多隻允許3個執行緒過來
public static void method() throws InterruptedException{
semaphore.acquire();//獲取一把鎖
System.out.println("ThreadName="+Thread.currentThread().getName()+"過來了");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ThreadName="+Thread.currentThread().getName()+"出去了");
semaphore.release();//釋放一把鎖
}
}
輸出結果
ThreadName=Thread-1過來了
ThreadName=Thread-4過來了
ThreadName=Thread-0過來了
ThreadName=Thread-1出去了
ThreadName=Thread-4出去了
ThreadName=Thread-2過來了
ThreadName=Thread-3過來了
ThreadName=Thread-0出去了
ThreadName=Thread-5過來了
ThreadName=Thread-3出去了
ThreadName=Thread-2出去了
ThreadName=Thread-6過來了
ThreadName=Thread-7過來了
ThreadName=Thread-5出去了
ThreadName=Thread-9過來了
ThreadName=Thread-7出去了
ThreadName=Thread-6出去了
ThreadName=Thread-8過來了
ThreadName=Thread-9出去了
ThreadName=Thread-8出去了
三個執行緒a、b、c 併發執行,b,c 需要a 執行緒的資料怎麼實現
根據問題的描述,我將問題用以下程式碼演示,ThreadA、ThreadB、ThreadC,ThreadA 用於初始化資料num,
只有當num 初始化完成之後再讓ThreadB 和ThreadC 獲取到初始化後的變數num。
分析過程如下:
考慮到多執行緒的不確定性,因此我們不能確保ThreadA 就一定先於ThreadB 和ThreadC 前執行,就算ThreadA先執行了,我們也無法保證ThreadA 什麼時候才能將變數num 給初始化完成。因此我們必須讓ThreadB 和ThreadC去等待ThreadA 完成任何後發出的訊息。
現在需要解決兩個難題,一是讓ThreadB 和ThreadC 等待ThreadA 先執行完,二是ThreadA 執行完之後給
ThreadB 和ThreadC 傳送訊息。
解決上面的難題我能想到的兩種方案,一是使用純Java API 的Semaphore 類來控制執行緒的等待和釋放,二是使用Android 提供的Handler 訊息機制
public class ThreadCommunication {
private static int num;//定義一個變數作為資料
public static void main(String[] args) {
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
try {
//模擬耗時操作之後初始化變數num
Thread.sleep(1000);
num = 1;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+"獲取到num 的值為:"+num);
}
});
Thread threadC = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+"獲取到num 的值為:"+num);
}
});
//同時開啟3 個執行緒
threadA.start();
threadB.start();
threadC.start();
}
}
public class ThreadCommunication {
private static int num;
/**
* 定義一個訊號量,該類內部維持了多個執行緒鎖,可以阻塞多個執行緒,釋放多個執行緒,
* 執行緒的阻塞和釋放是通過permit 概念來實現的執行緒通過semaphore.acquire()方法獲取permit,
* 如果當前semaphore 有permit 則分配給該執行緒,如果沒有則阻塞該執行緒直到semaphore
* 呼叫release()方法釋放permit。建構函式中引數:permit(允許) 個數
*/
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) {
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
try {
//模擬耗時操作之後初始化變數num
Thread.sleep(1000);
num = 1;
//初始化完引數後釋放兩個permit
semaphore.release(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
try {
//獲取permit,如果semaphore 沒有可用的permit 則等待
// 如果有則消耗一個
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+"獲取到num 的值為:"+num);
}
});
Thread threadC = new Thread(new Runnable() {
@Override
public void run() {
try {
//獲取permit,如果semaphore 沒有可用的permit 則等待
// 如果有則消耗一個
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+"獲取到num 的值為:"+num);
}
});
//同時開啟3 個執行緒
threadA.start();
threadB.start();
threadC.start();
}
}
2. CyclicBarrier
一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。
CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作 很有用。
3個執行緒到達某個集合點後再向下執行,使用await方法實現
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();//在所有參與者都已經在此 barrier 上呼叫 await 方法之前,將一直等待。
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
輸出結果
執行緒pool-1-thread-3即將到達集合地點1,當前已有1個已經到達,正在等候
執行緒pool-1-thread-1即將到達集合地點1,當前已有2個已經到達,正在等候
執行緒pool-1-thread-2即將到達集合地點1,當前已有3個已經到達,都到齊了,繼續走啊
執行緒pool-1-thread-1即將到達集合地點2,當前已有1個已經到達,正在等候
執行緒pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候
執行緒pool-1-thread-3即將到達集合地點2,當前已有3個已經到達,都到齊了,繼續走啊
執行緒pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候
執行緒pool-1-thread-1即將到達集合地點3,當前已有2個已經到達,正在等候
執行緒pool-1-thread-2即將到達集合地點3,當前已有3個已經到達,都到齊了,繼續走啊
3. CountDownLatch
一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。猶如倒計時計數器,呼叫CountDownLatch物件的countDown方法就將計數器減1,當計數到達0時,則所有等待者或單個等待者開始執行。
可以實現一個人(也可以是多個人)等待其他所有人都來通知他,也可以實現一個人通知多個人的效果,類似裁判一聲口令,運動員開始奔跑(一對多),或者所有運送員都跑到終點後裁判才可以公佈結果(多對一)。
用指定的計數 初始化 CountDownLatch。在呼叫 countDown() 方法之前,await 方法會一直受阻塞。之後,會釋放所有等待的執行緒,await 的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
實現運動員比賽的效果
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountdownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//構造一個用給定計數初始化的 CountDownLatch,相當於裁判的口哨
final CountDownLatch cdOrder = new CountDownLatch(1);
//相當於定義3個執行員
final CountDownLatch cdAnswer = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
public void run() {
try {
System.out.println("執行緒"
+ Thread.currentThread().getName() + "正準備接受命令");
// 等待發令槍
cdOrder.await();//使當前執行緒在鎖存器倒計數至零之前一直等待
System.out.println("執行緒"
+ Thread.currentThread().getName() + "已接受命令");
Thread.sleep((long) (Math.random() * 10000));
System.out
.println("執行緒"
+ Thread.currentThread().getName()
+ "迴應命令處理結果");
// 各個運動員完報告成績之後,通知裁判
cdAnswer.countDown();//遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("執行緒" + Thread.currentThread().getName()
+ "即將釋出命令");
// 發令槍打響,比賽開始
cdOrder.countDown();
System.out.println("執行緒" + Thread.currentThread().getName()
+ "已傳送命令,正在等待結果");
// 裁判等待各個運動員的結果
cdAnswer.await();
// 裁判公佈獲得所有運動員的成績
System.out.println("執行緒" + Thread.currentThread().getName()
+ "已收到所有響應結果");
} catch (Exception e) {
e.printStackTrace();
}
service.shutdown();
}
}
輸出結果
執行緒pool-1-thread-2正準備接受命令
執行緒pool-1-thread-3正準備接受命令
執行緒pool-1-thread-1正準備接受命令
執行緒main即將釋出命令
執行緒main已傳送命令,正在等待結果
執行緒pool-1-thread-1已接受命令
執行緒pool-1-thread-2已接受命令
執行緒pool-1-thread-3已接受命令
執行緒pool-1-thread-1迴應命令處理結果
執行緒pool-1-thread-3迴應命令處理結果
執行緒pool-1-thread-2迴應命令處理結果
執行緒main已收到所有響應結果
4. Exchanger
用於實現兩個物件之間的資料交換,每個物件在完成一定的事務後想與對方交換資料,第一個先拿出資料的物件將一直等待第二個物件拿著資料到來時,彼此才能交換資料。
方法:exchange(V x)
等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。
應用:使用 Exchanger 線上程間交換緩衝區
示例:模擬毒品交易情景
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable(){
public void run() {
try {
String data1 = "毒品";
System.out.println("執行緒" + Thread.currentThread().getName() +
"正在把: " + data1 +" 交易出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("執行緒" + Thread.currentThread().getName() +
"換得了: " + data2);
}catch(Exception e){
}
}
});
service.execute(new Runnable(){
public void run() {
try {
String data1 = "美金";
System.out.println("執行緒" + Thread.currentThread().getName() +
"正在把: " + data1 +" 交易出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("執行緒" + Thread.currentThread().getName() +
"換得了: " + data2);
}catch(Exception e){
}
}
});
}
}
執行緒pool-1-thread-1正在把: 毒品 交易出去
執行緒pool-1-thread-2正在把: 美金 交易出去
執行緒pool-1-thread-1換得了: 美金
執行緒pool-1-thread-2換得了: 毒品
5. ArrayBlockingQueue
一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列包含固定長度的佇列和不固定長度的佇列。
這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。
通俗的講:當指定佇列大小,如果已經放滿,其他存入資料的執行緒就阻塞,等著該佇列中有空位,才能放進去。當取的比較快,佇列中沒有資料,取資料的執行緒阻塞,等佇列中放入了資料,才可以取。
ArrayBlockingQueue中只有put和take方法才具有阻塞功能。方法型別如下
功能 | 丟擲異常 | 特殊值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
示例:用3個空間的佇列來演示向阻塞佇列中存取資料的效果
package cn.xushuai.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) {
final BlockingQueue queue = new ArrayBlockingQueue(3);
for(int i=0;i<2;i++){
new Thread(){
public void run(){
while(true){
try {
Thread.sleep((long)(Math.random()*1000));
System.out.println(Thread.currentThread().getName() + "準備放資料!");
queue.put(1); //放進去後,可能立即執行“準備取資料”
System.out.println(Thread.currentThread().getName() + "已經放了資料," +
"佇列目前有" + queue.size() + "個數據");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread(){
public void run(){
while(true){
try {
//將此處的睡眠時間分別改為100和1000,觀察執行結果
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "準備取資料!");
queue.take(); //取出後可能來不及執行下面的列印語句,就跑到了“準備放資料”,
System.out.println(Thread.currentThread().getName() + "已經取走資料," +
"佇列目前有" + queue.size() + "個數據");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有1個數據
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有2個數據
Thread-1準備放資料!
Thread-1已經放了資料,佇列目前有3個數據
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-1準備放資料!
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-0已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-1已經放了資料,佇列目前有3個數據
Thread-1準備放資料!
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-0已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-2準備取資料!
...
6. 阻塞佇列間的通訊
A佇列向空間中存資料,B從空間裡取資料,A存入後,通知B去取,B取過之後,通知A去放,依次迴圈
示例:子執行緒先迴圈10次,接著主執行緒迴圈100次,接著又回到子執行緒,迴圈10次,再回到主執行緒又迴圈100,如此迴圈50次。
說明:這裡通過使 用兩個具有1個空間的佇列來實現同步通知的功能(實現了鎖和condition的功能),以便實現佇列間的通訊,其中使用到了構造程式碼塊為主佇列先存入一個數據,以使其先阻塞,子佇列先執行。
使用構造程式碼塊的原因:
成員變數在建立類的例項物件時,才分配空間,才能有值,所以建立一個構造方法來給main_quene賦值,這裡不可以使用靜態程式碼塊,因為靜態在還沒建立物件就存在, 而sub_quene和main_quene是物件建立以後的成員變數,所以這裡用匿名構造方法,它的執行時期在任何構造方法之前,建立幾個物件就執行幾次
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueCommunication {
public static void main(String[] args){
final Business business = new Business();
new Thread(new Runnable(){
@Override
public void run() {
for(int i=1;i<=50;i++){
business.sub(i);
}
}
}).start();
//主執行緒外部迴圈
for(int i=1;i<=50;i++){
business.main(i);
}
}
//業務類
static class Business{
BlockingQueue<Integer> sub_quene = new ArrayBlockingQueue<Integer>(1);
BlockingQueue<Integer> main_quene = new ArrayBlockingQueue<Integer>(1);
{
//為了讓子佇列先走,所以在一開始就往主佇列中存入一個物件,使其阻塞。
try {
main_quene.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//子佇列先走
public void sub(int i){
try {
sub_quene.put(1); //子佇列第一次存入,可以執行,但由於只有1個空間,已經存滿,所以只有在執行後要等到take之後才能繼續下次執行
} catch (InterruptedException e) {
e.printStackTrace();
}
//子佇列迴圈執行
for(int j=1;j<=10;j++){
System.out.println("sub thread sequence of"+i+",loop of "+j);
}
try {
main_quene.take(); //讓主佇列從已經填滿的佇列中取出資料,使其開始第一次執行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void main(int i){
try {
main_quene.put(1); //主佇列先前放過1個空間,現在處於阻塞狀態,等待子佇列通知,即子執行緒中的main_quene.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
//主佇列迴圈執行
for(int j=1;j<=100;j++){
System.out.println("main thread sequence of"+i+", loop of "+j);
}
try {
sub_quene.take(); //讓子佇列從已經填滿的佇列中取出資料,使其執行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
7. 同步集合類
7.1 同步Map集合
- java.util.concurrent.ConcurrentMap
- ConcurrentHashMap
- ConcurrentNavigableMap
- ConcurrentSkipListMap
ConcurrentHashMap
同步的HashMap,支援獲取的完全併發和更新的所期望可調整併發的雜湊表。此類遵守與 Hashtable 相同的功能規範,並且包括對應於 Hashtable 的每個方法的方法版本。
不過,儘管所有操作都是執行緒安全的,但獲取操作不 必鎖定,並且不 支援以某種防止所有訪問的方式鎖定整個表。此類可以通過程式完全與 Hashtable 進行互操作,這取決於其執行緒安全,而與其同步細節無關。
內部原理:
其實內部使用了代理模式,你給我一個HashMap,我就給你一個同步的HashMap。同步的HashMap在呼叫方法時,是去分配給原始的HashMap只是在去呼叫方法的同時加上了Synchronized,以此實現同步效果
ConcurrentHashMap是執行緒安全的HashMap的實現,預設構造同樣有initialCapacity和loadFactor屬性,不過還多了一個concurrencyLevel屬性,三屬性預設值分別為16、0.75及16。其內部使用鎖分段技術,維持這鎖Segment的陣列,在Segment陣列中又存放著Entity[]陣列,內部hash演算法將資料較均勻分佈在不同鎖中。
- put(key , value)
並沒有在此方法上加上synchronized,首先對key.hashcode進行hash操作,得到key的hash值。hash操作的演算法和map也不同,根據此hash值計算並獲取其對應的陣列中的Segment物件(繼承自ReentrantLock),接著呼叫此Segment物件的put方法來完成當前操作。
ConcurrentHashMap基於concurrencyLevel劃分出了多個Segment來對key-value進行儲存,從而避免每次put操作都得鎖住整個陣列。在預設的情況下,最佳情況下可允許16個執行緒併發無阻塞的操作集合物件,儘可能地減少併發時的阻塞現象。
- get(key)
首先對key.hashCode進行hash操作,基於其值找到對應的Segment物件,呼叫其get方法完成當前操作。而Segment的get操作首先通過hash值和物件陣列大小減1的值進行按位與操作來獲取陣列上對應位置的HashEntry。在這個步驟中,可能會因為物件陣列大小的改變,以及陣列上對應位置的HashEntry產生不一致性,那麼ConcurrentHashMap是如何保證的?
物件陣列大小的改變只有在put操作時有可能發生,由於HashEntry物件陣列對應的變數是volatile型別的,因此可以保證如HashEntry物件陣列大小發生改變,讀操作可看到最新的物件陣列大小。
在獲取到了HashEntry物件後,怎麼能保證它及其next屬性構成的連結串列上的物件不會改變呢?這點ConcurrentHashMap採用了一個簡單的方式,即HashEntry物件中的hash、key、next屬性都是final的,這也就意味著沒辦法插入一個HashEntry物件到基於next屬性構成的連結串列中間或末尾。這樣就可以保證當獲取到Hash