解決多執行緒安全問題-無非兩個方法synchronized和lock
部落格引用處(以下內容在原有部落格基礎上進行補充或更改,謝謝這些大牛的部落格指導):
解決多執行緒安全問題-無非兩個方法synchronized和lock 具體原理(百度)
還有其他的鎖,如果想要了解,參考:JAVA鎖機制-可重入鎖,可中斷鎖,公平鎖,讀寫鎖,自旋鎖,
解決多執行緒的併發安全問題,java無非就是加鎖,具體就是兩個方法
(1) Synchronized(java自帶的關鍵字)
(2) lock 可重入鎖 (可重入鎖這個包java.util.concurrent.locks 底下有兩個介面,分別對應兩個類實現了這個兩個介面:
(a)lock介面, 實現的類為:ReentrantLock類 可重入鎖; (b)readwritelock介面,實現類為:ReentrantReadWriteLock 讀寫鎖)
也就是說有三種:
(1)synchronized 是互斥鎖;
(2)ReentrantLock 顧名思義 :可重入鎖
(3)ReentrantReadWriteLock :讀寫鎖
讀寫鎖特點:
a)多個讀者可以同時進行讀
b)寫者必須互斥(只允許一個寫者寫,也不能讀者寫者同時進行)
c)寫者優先於讀者(一旦有寫者,則後續讀者必須等待,喚醒時優先考慮寫者)
總結來說,Lock和synchronized有以下幾點不同:
1)Lock是一個介面,而synchronized是Java中的關鍵字,synchronized是內建的語言實現;
2)synchronized在發生異常時,會自動釋放執行緒佔有的鎖,因此不會導致死鎖現象發生;而Lock在發生異常時,如果沒有主動通過unLock()去釋放鎖,則很可能造成死鎖現象,因此使用Lock時需要在finally塊中釋放鎖;
3)Lock可以讓等待鎖的執行緒響應中斷,而synchronized卻不行,使用synchronized時,等待的執行緒會一直等待下去,不能夠響應中斷;
4)通過Lock可以知道有沒有成功獲取鎖,而synchronized卻無法辦到。
5)Lock可以提高多個執行緒進行讀操作的效率。
在效能上來說,如果競爭資源不激烈,兩者的效能是差不多的,而當競爭資源非常激烈時(即有大量執行緒同時競爭),此時Lock的效能要遠遠優於synchronized。所以說,在具體使用時要根據適當情況選擇。
首先看一下Synchronized的原理:
1、synchronized
把程式碼塊宣告為 synchronized,有兩個重要後果,通常是指該程式碼具有 原子性(atomicity)和 可見性(visibility)。
(1) 原子性
原子性意味著個時刻,只有一個執行緒能夠執行一段程式碼,這段程式碼通過一個monitor object保護。從而防止多個執行緒在更新共享狀態時相互衝突。
(2) 可見性
可見性則更為微妙,它要對付記憶體快取和編譯器優化的各種反常行為。啥是可見性呢?
答:它必須確保釋放鎖之前對共享資料做出的更改對於隨後獲得該鎖的另一個執行緒是可見的 。
作用:如果沒有同步機制提供的這種可見性保證,執行緒看到的共享變數可能是修改前的值或不一致的值,這將引發許多嚴重問題。
一般來說,執行緒以某種不必讓其他執行緒立即可以看到的方式(不管這些執行緒在暫存器中、在處理器特定的快取中,還是通過指令重排或者其他編譯器優化),不受快取變數值的約束,但是如果開發人員使用了同步,那麼執行庫將確保某一執行緒對變數所做的更新先於對現有synchronized 塊所進行的更新,當進入由同一監控器(lock)保護的另一個synchronized 塊時,將立刻可以看到這些對變數所做的更新。類似的規則也存在於volatile變數上。
——volatile只保證可見性,不保證原子性!
(3)synchronize的限制:
- 當執行緒嘗試獲取鎖的時候,如果獲取不到鎖會一直阻塞, 它無法中斷一個正在等候獲得鎖的執行緒;
- 如果獲取鎖的執行緒進入休眠或者阻塞,除非當前執行緒異常,否則其他執行緒嘗試獲取鎖必須一直等待,也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖。
2、ReentrantLock (可重入鎖)
何為可重入(美團面試提問過此處):參考:如何理解ReentrantLock的可重入和互斥?
可重入的意思是某一個執行緒是否可多次獲得一個鎖,在繼承的情況下,如果不是可重入的,那就形成死鎖了,比如遞迴呼叫自己的時候;,如果不能可重入,每次都獲取鎖不合適,比如synchronized就是可重入的,ReentrantLock也是可重入的
鎖的概念就不用多解釋了,當某個執行緒A已經持有了一個鎖,當執行緒B嘗試進入被這個鎖保護的程式碼段的時候.就會被阻塞.而鎖的操作粒度是”執行緒”,而不是呼叫(至於為什麼要這樣,下面解釋).同一個執行緒再次進入同步程式碼的時候.可以使用自己已經獲取到的鎖,這就是可重入鎖java裡面內建鎖(synchronize)和Lock(ReentrantLock)都是可重入的
例子:
package entrantlock_test;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class parent {
protected Lock lock=new ReentrantLock();
public void test(){
lock.lock();
try{
System.out.println("Parent");
}finally{
lock.unlock();
}
}
}
class Sub extends parent{
@Override
public void test() {
// TODO Auto-generated method stub
lock.lock();
try{
super.test();
System.out.println("Sub");
}finally{
lock.unlock();
}
}
}
public class LockTest{
public static void main(String[] args){
Sub s=new Sub();
s.test();
}
}
2.1 . 為什麼要可重入
如果執行緒A繼續再次獲得這個鎖呢?比如一個方法是synchronized,遞迴呼叫自己,那麼第一次已經獲得了鎖,第二次呼叫的時候還能進入嗎? 直觀上當然需要能進入.這就要求必須是可重入的.可重入鎖又叫做遞迴鎖,不然就死鎖了。
它實現方式是:
為每個鎖關聯一個獲取計數器和一個所有者執行緒,當計數值為0的時候,這個所就沒有被任何執行緒只有.當執行緒請求一個未被持有的鎖時,JVM將記下鎖的持有者,並且將獲取計數值置為1,如果同一個執行緒再次獲取這個鎖,技術值將遞增,退出一次同步程式碼塊,計算值遞減,當計數值為0時,這個鎖就被釋放.ReentrantLock裡面有實現
其實也有不可重入鎖:這個還真有.Linux下的pthread_mutex_t鎖是預設是非遞迴的。可以通過設定PTHREAD_MUTEX_RECURSIVE屬性,將pthread_mutex_t鎖設定為遞迴鎖。如果要自己實現不可重入鎖,同可重入鎖,這個計數器只能為1.或者0,再次進入的時候,發現已經是1了,就進行阻塞.jdk裡面沒有預設的實現類.
Java.util.concurrent.lock 中的Lock 框架是鎖定的一個抽象,Lock彌補了synchronized的侷限,提供了更加細粒度的加鎖功能。
ReentrantLock 類是唯一實現了Lock的類 ,它擁有與synchronized 相同的併發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的效能。(換句話說,當許多執行緒都想訪問共享資源時,JVM 可以花更少的時候來排程執行緒,把更多時間用在執行執行緒上。)
用sychronized修飾的方法或者語句塊在程式碼執行完之後鎖自動釋放,而是用Lock需要我們手動釋放鎖,所以為了保證鎖最終被釋放(發生異常情況),要把互斥區放在try內,釋放鎖放在finally內!!
Lock 介面api如下
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
其中最常用的就是lock和unlock操作了。因為使用lock時,需要手動的釋放鎖,所以需要使用try…catch來包住業務程式碼,並且在finally中釋放鎖。典型使用如下
private Lock lock = new ReentrantLock();
public void test(){
lock.lock();
try{
doSomeThing();
}catch (Exception e){
// ignored
}finally {
lock.unlock();
}
}
2.2 AQS
AbstractQueuedSynchronizer簡稱AQS,是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題。
AQS使用一個FIFO的隊列表示排隊等待鎖的執行緒,它維護一個status的變數,每個節點維護一個waitstatus的變數,當執行緒獲取到鎖的時候,佇列的status置為1,此執行緒執行完了,那麼它的waitstatus為-1;佇列頭部的執行緒執行完畢之後,它會呼叫它的後繼的執行緒(百度面試)。
佇列頭節點稱作“哨兵節點”或者“啞節點”,它不與任何執行緒關聯。其他的節點與等待執行緒關聯,每個節點維護一個等待狀態waitStatus。如圖
AQS中還有一個表示狀態的欄位state,例如ReentrantLocky用它表示執行緒重入鎖的次數,Semaphore用它表示剩餘的許可數量,FutureTask用它表示任務的狀態。對state變數值的更新都採用CAS操作保證更新操作的原子性。
AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個類只有一個變數:exclusiveOwnerThread,表示當前佔用該鎖的執行緒,並且提供了相應的get,set方法。
理解AQS可以幫助我們更好的理解JCU包中的同步容器。
3 lock()與unlock()實現原理
ReentrantLock是Lock的預設實現之一。那麼lock()和unlock()是怎麼實現的呢?首先我們要弄清楚幾個概念
-
可重入鎖。可重入鎖是指同一個執行緒可以多次獲取同一把鎖。ReentrantLock和synchronized都是可重入鎖。
-
可中斷鎖。可中斷鎖是指執行緒嘗試獲取鎖的過程中,是否可以響應中斷。synchronized是不可中斷鎖,而ReentrantLock則提供了中斷功能。
-
公平鎖與非公平鎖。公平鎖是指多個執行緒同時嘗試獲取同一把鎖時,獲取鎖的順序按照執行緒達到的順序,而非公平鎖則允許執行緒“插隊”。synchronized是非公平鎖,而ReentrantLock的預設實現是非公平鎖,但是也可以設定為公平鎖。
-
CAS操作(CompareAndSwap)。CAS操作簡單的說就是比較並交換。CAS 操作包含三個運算元 ——
記憶體位置(V)、預期原值(A)和新值(B)。如果記憶體位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS 指令之前返回該位置的值。CAS 有效地說明了“我認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。”
Java併發包(java.util.concurrent)中大量使用了CAS操作,涉及到併發的地方都呼叫了sun.misc.Unsafe類方法進行CAS操作。ReentrantLock提供了兩個構造器,分別是
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
預設構造器初始化為NonfairSync物件,即非公平鎖,而帶引數的構造器可以指定使用公平鎖和非公平鎖。由lock()和unlock的原始碼可以看到,它們只是分別呼叫了sync物件的lock()和release(1)方法。
Sync是ReentrantLock的內部類,它的結構如下
可以看到Sync擴充套件了AbstractQueuedSynchronizer。
3.3 NonfairSync
我們從原始碼出發,分析非公平鎖獲取鎖和釋放鎖的過程。
3.3.1 lock()
lock()原始碼如下
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先用一個CAS操作,判斷state是否是0(表示當前鎖未被佔用),如果是0則把它置為1,並且設定當前執行緒為該鎖的獨佔執行緒,表示獲取鎖成功。當多個執行緒同時嘗試佔用同一個鎖時,CAS操作只能保證一個執行緒操作成功,剩下的只能乖乖的去排隊啦。
**“非公平”即體現在這裡,如果佔用鎖的執行緒剛釋放鎖,state置為0,而排隊等待鎖的執行緒還未喚醒時,新來的執行緒就直接搶佔了該鎖,那麼就“插隊”了(請注意此處的非公平鎖是指新來的執行緒跟佇列頭部的執行緒競爭鎖,佇列其他的執行緒還是正常排隊,百度面試題)。**
若當前有三個執行緒去競爭鎖,假設執行緒A的CAS操作成功了,拿到了鎖開開心心的返回了,那麼執行緒B和C則設定state失敗,走到了else裡面。我們往下看acquire。
acquire(arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
程式碼非常簡潔,但是背後的邏輯卻非常複雜,可見Doug Lea大神的程式設計功力。
- 第一步。嘗試去獲取鎖。如果嘗試獲取鎖成功,方法直接返回。
tryAcquire(arg)
final boolean nonfairTryAcquire(int acquires) {
//獲取當前執行緒
final Thread current = Thread.currentThread();
//獲取state變數值
int c = getState();
if (c == 0) { //沒有執行緒佔用鎖
if (compareAndSetState(0, acquires)) {
//佔用鎖成功,設定獨佔執行緒為當前執行緒
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { //當前執行緒已經佔用該鎖
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新state值為新的重入次數
setState(nextc);
return true;
}
//獲取鎖失敗
return false;
}
非公平鎖tryAcquire的流程是:檢查state欄位,若為0,表示鎖未被佔用,那麼嘗試佔用,若不為0,檢查當前鎖是否被自己佔用,若被自己佔用,則更新state欄位,表示重入鎖的次數。如果以上兩點都沒有成功,則獲取鎖失敗,返回false。
-
第二步,入隊。由於上文中提到執行緒A已經佔用了鎖,所以B和C執行tryAcquire失敗,並且入等待佇列。如果執行緒A拿著鎖死死不放,那麼B和C就會被掛起。
先看下入隊的過程。
先看addWaiter(Node.EXCLUSIVE)
/**
* 將新節點和當前執行緒關聯並且入佇列
* @param mode 獨佔/共享
* @return 新節點
*/
private Node addWaiter(Node mode) {
//初始化節點,設定關聯執行緒和模式(獨佔 or 共享)
Node node = new Node(Thread.currentThread(), mode);
// 獲取尾節點引用
Node pred = tail;
// 尾節點不為空,說明佇列已經初始化過
if (pred != null) {
node.prev = pred;
// 設定新節點為尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾節點為空,說明佇列還未初始化,需要初始化head節點併入隊新節點
enq(node);
return node;
}
B、C執行緒同時嘗試入佇列,由於佇列尚未初始化,tail==null,故至少會有一個執行緒會走到enq(node)。我們假設同時走到了enq(node)裡。
/**
* 初始化佇列並且入隊新節點
*/
private Node enq(final Node node) {
//開始自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 如果tail為空,則新建一個head節點,並且tail指向head
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// tail不為空,將新節點入隊
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這裡體現了經典的自旋+CAS組合來實現非阻塞的原子操作。由於compareAndSetHead的實現使用了unsafe類提供的CAS操作,所以只有一個執行緒會建立head節點成功。假設執行緒B成功,之後B、C開始第二輪迴圈,此時tail已經不為空,兩個執行緒都走到else裡面。假設B執行緒compareAndSetTail成功,那麼B就可以返回了,C由於入隊失敗還需要第三輪迴圈。最終所有執行緒都可以成功入隊。
當B、C入等待佇列後,此時AQS佇列如下:
3. 第三步,掛起。B和C相繼執行acquireQueued(final Node node, int arg)。這個方法讓已經入隊的執行緒嘗試獲取鎖,若失敗則會被掛起。
/**
* 已經入隊的執行緒嘗試獲取鎖
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //標記是否成功獲取鎖
try {
boolean interrupted = false; //標記執行緒是否被中斷過
for (;;) {
final Node p = node.predecessor(); //獲取前驅節點
//如果前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node); // 獲取成功,將當前節點設定為head節點
p.next = null; // 原head節點出隊,在某個時間點被GC回收
failed = false; //獲取成功
return interrupted; //返回是否被中斷過
}
// 判斷獲取失敗後是否可以掛起,若可以則掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 執行緒若被中斷,設定interrupted為true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
code裡的註釋已經很清晰的說明了acquireQueued的執行流程。假設B和C在競爭鎖的過程中A一直持有鎖,那麼它們的tryAcquire操作都會失敗,因此會走到第2個if語句中。我們再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都做了哪些事吧。
/**
* 判斷當前執行緒獲取鎖失敗之後是否需要掛起.
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驅節點狀態為signal,返回true
return true;
// 前驅節點狀態為CANCELLED
if (ws > 0) {
// 從隊尾向前尋找第一個狀態不為CANCELLED的節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將前驅節點的狀態設定為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 掛起當前執行緒,返回執行緒中斷狀態並重置
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
執行緒入隊後能夠掛起的前提是,它的前驅節點的狀態為SIGNAL,它的含義是“Hi,前面的兄弟,如果你獲取鎖並且出隊後,記得把我喚醒!”。所以shouldParkAfterFailedAcquire會先判斷當前節點的前驅是否狀態符合要求,若符合則返回true,然後呼叫parkAndCheckInterrupt,將自己掛起。如果不符合,再看前驅節點是否>0(CANCELLED),若是那麼向前遍歷直到找到第一個符合要求的前驅,若不是則將前驅節點的狀態設定為SIGNAL。
也就是說當佇列頭部的執行緒執行完了之後,這個執行緒會呼叫後面的佇列的第一個執行緒(百度面試)。
整個流程中,如果前驅結點的狀態不是SIGNAL,那麼自己就不能安心掛起,需要去找個安心的掛起點,同時可以再嘗試下看有沒有機會去嘗試競爭鎖。
最終佇列可能會如下圖所示
執行緒B和C都已經入隊,並且都被掛起。當執行緒A釋放鎖的時候,就會去喚醒執行緒B去獲取鎖啦。
3.3.2 unlock()
unlock相對於lock就簡單很多。原始碼如下
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果理解了加鎖的過程,那麼解鎖看起來就容易多了。流程大致為先嚐試釋放鎖,若釋放成功,那麼檢視頭結點的狀態是否為SIGNAL,如果是則喚醒頭結點的下個節點關聯的執行緒,如果釋放失敗那麼返回false表示解鎖失敗。這裡我們也發現了,每次都只喚起頭結點的下一個節點關聯的執行緒。
最後我們再看下tryRelease的執行過程
/**
* 釋放當前執行緒佔用的鎖
* @param releases
* @return 是否釋放成功
*/
protected final boolean tryRelease(int releases) {
// 計算釋放後state值
int c = getState() - releases;
// 如果不是當前執行緒佔用鎖,那麼丟擲異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 鎖被重入次數為0,表示釋放成功
free = true;
// 清空獨佔執行緒
setExclusiveOwnerThread(null);
}
// 更新state值
setState(c);
return free;
}
這裡入參為1。tryRelease的過程為:當前釋放鎖的執行緒若不持有鎖,則丟擲異常。若持有鎖,計算釋放後的state值是否為0,若為0表示鎖已經被成功釋放,並且則清空獨佔執行緒,最後更新state值,返回free。
3.3.3 小結
用一張流程圖總結一下非公平鎖的獲取鎖的過程。
3.4 FairSync
公平鎖和非公平鎖不同之處在於,公平鎖在獲取鎖的時候,不會先去檢查state狀態,而是直接執行aqcuire(1),這裡不再贅述。
4 超時機制
在ReetrantLock的tryLock(long timeout, TimeUnit unit) 提供了超時獲取鎖的功能。它的語義是在指定的時間內如果獲取到鎖就返回true,獲取不到則返回false。這種機制避免了執行緒無限期的等待鎖釋放。那麼超時的功能是怎麼實現的呢?我們還是用非公平鎖為例來一探究竟。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
還是呼叫了內部類裡面的方法。我們繼續向前探究
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
這裡的語義是:如果執行緒被中斷了,那麼直接丟擲InterruptedException。如果未中斷,先嚐試獲取鎖,獲取成功就直接返回,獲取失敗則進入doAcquireNanos。tryAcquire我們已經看過,這裡重點看一下doAcquireNanos做了什麼。
/**
* 在有限的時間內去競爭鎖
* @return 是否獲取成功
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 起始時間
long lastTime = System.nanoTime();
// 執行緒入隊
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 又是自旋!
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 如果前驅是頭節點並且佔用鎖成功,則將當前節點變成頭結點
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 如果已經超時,返回false
if (nanosTimeout <= 0)
return false;
// 超時時間未到,且需要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 阻塞當前執行緒直到超時時間到期
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 更新nanosTimeout
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
//相應中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireNanos的流程簡述為:執行緒先入等待佇列,然後開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在佇列裡找一個安全點把自己掛起直到超時時間過期。這裡為什麼還需要迴圈呢?因為當前執行緒節點的前驅狀態可能不是SIGNAL,那麼在當前這一輪迴圈中執行緒不會被掛起,然後更新超時時間,開始新一輪的嘗試
5、讀寫鎖ReentrantReadWriteLock
介面 ReadWriteLock,有個實現類是ReentrantReadWriteLock
讀讀互不干擾,寫寫互斥,如果有讀也有寫,那麼寫執行緒要優先讀執行緒
對!讀取執行緒不應該互斥!
我們可以用讀寫鎖ReadWriteLock實現:
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Data {
private int data;// 共享資料
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public void set(int data) {
rwl.writeLock().lock();// 取到寫鎖
try {
System.out.println(Thread.currentThread().getName() + "準備寫入資料");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
System.out.println(Thread.currentThread().getName() + "寫入" + this.data);
} finally {
rwl.writeLock().unlock();// 釋放寫鎖
}
}
public void get() {
rwl.readLock().lock();// 取到讀鎖
try {
System.out.println(Thread.currentThread().getName() + "準備讀取資料");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "讀取" + this.data);
} finally {
rwl.readLock().unlock();// 釋放讀鎖
}
}
}
與互斥鎖定相比,讀-寫鎖定允許對共享資料進行更高級別的併發訪問。雖然一次只有一個執行緒(writer 執行緒)可以修改共享資料,但在許多情況下,任何數量的執行緒可以同時讀取共享資料(reader 執行緒)
從理論上講,與互斥鎖定相比,使用讀-寫鎖定所允許的併發性增強將帶來更大的效能提高。
在實踐中,只有在多處理器上並且只在訪問模式適用於共享資料時,才能完全實現併發性增強。——例如,某個最初用資料填充並且之後不經常對其進行修改的 collection,因為經常對其進行搜尋(比如搜尋某種目錄),所以這樣的 collection 是使用讀-寫鎖定的理想候選者。
6·執行緒間通訊Condition
Condition可以替代傳統的執行緒間通訊,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()。
——為什麼方法名不直接叫wait()/notify()/nofityAll()?因為Object的這幾個方法是final的,不可重寫!
傳統執行緒的通訊方式,Condition都可以實現。
注意,Condition是被繫結到Lock上的,要建立一個Lock的Condition必須用newCondition()方法。
Condition的強大之處在於它可以為多個執行緒間建立不同的Condition
看JDK文件中的一個例子:假定有一個繫結的緩衝區,它支援 put 和 take 方法。如果試圖在空的緩衝區上執行take 操作,則在某一個項變得可用之前,執行緒將一直阻塞;如果試圖在滿的緩衝區上執行 put 操作,則在有空間變得可用之前,執行緒將一直阻塞。我們喜歡在單獨的等待 set 中儲存put 執行緒和take 執行緒,這樣就可以在緩衝區中的項或空間變得可用時利用最佳規劃,一次只通知一個執行緒。可以使用兩個Condition 例項來做到這一點。
——其實就是java.util.concurrent.ArrayBlockingQueue的功能
優點:
假設快取佇列中已經存滿,那麼阻塞的肯定是寫執行緒,喚醒的肯定是讀執行緒,相反,阻塞的肯定是讀執行緒,喚醒的肯定是寫執行緒。