執行緒活躍性問題及其解決方案
1.什麼是死鎖
例如上圖兩個執行緒互相持有對方需要的鎖,但是都不肯釋放持有的鎖,從而陷入死鎖。 在多個執行緒的情況下,存在環形的依賴關係,這樣就有可能發生死鎖,例如上圖,Thread1持有鎖A想獲取鎖B,Thread2持有鎖B想獲取鎖C,Thread3持有鎖C想獲取鎖A,但是每個執行緒都不肯讓出自己持有的鎖,這樣就發生了死鎖。死鎖發生於併發中,當兩個或更多執行緒互相持有對方的資源,但又不主動釋放,令兩個執行緒都無法前進,從而陷入無盡的等待之中的情況就是死鎖。
2.死鎖的影響
死鎖在不同系統中的影響是不同的,這取決於系統對死鎖的處理能力
在資料庫中可以對事物進行檢測和放棄,如果發生搶佔的情況可以指定某個事務放棄,這樣可以解決死鎖。但是在JVM中不具備自動處理的能力
死鎖發生的概率比較低,但是產生的危害比較大,在多執行緒併發情況下,影響的使用者比較多。
死鎖會導致系統整體崩潰,子系統崩潰,效能降低。並且壓力測試無法發現所有潛在的死鎖
3.發生死鎖的例子
3.1 看程式停止的訊號
/**
* 必定發生死鎖的情況
*/
public class MustDeadLock implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
@Override
public void run () {
System.out.println("flag = " + flag);
if (flag == 1){
synchronized (o1){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1持有o1" );
synchronized (o2){
System.out.println("thread1持有o2");
}
}
}
if (flag == 0){
synchronized (o2){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2持有o2");
synchronized (o1){
System.out.println("thread2持有o1");
}
}
}
}
public static void main(String[] args) {
MustDeadLock r1 = new MustDeadLock();
MustDeadLock r2 = new MustDeadLock();
r1.flag = 1;
r2.flag = 0;
Thread thread1 = new Thread(r1);
Thread thread2 = new Thread(r2);
thread1.start();
thread2.start();
}
}
複製程式碼
3.2 銀行轉賬發生死鎖
前提條件:需要兩把鎖(將轉賬和被轉賬的執行緒鎖住,保證中間不被幹擾),在成功獲取兩把鎖的情況下,且餘額大於0,則扣除轉賬人,增加收款人的餘額,是原子操作。 順序相反導致死鎖。
/**
* 轉賬的時候遇到了死鎖,一旦開啟註釋,便會發生死鎖
*/
public class TransferMoney implements Runnable {
int flag = 1;
//a和b沒人都有500
static Account a = new Account(500);
static Account b = new Account(500);
public static void main(String[] args) throws InterruptedException {
TransferMoney r1 = new TransferMoney();
TransferMoney r2 = new TransferMoney();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a的餘額為:" + a.balance);
System.out.println("b的餘額為:" + b.balance);
}
@Override
public void run() {
if (flag == 1){
//a向b轉200
transferMoney(a,b,200);
}
if (flag == 0){
//b向a轉200
transferMoney(b,a,200);
}
}
public static void transferMoney(Account from,Account to,int amount){
synchronized (from){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (to){
if (from.balance - amount < 0){
System.out.println("轉賬失敗,餘額不足");
}
from.balance -= amount;
to.balance += amount;
System.out.println("成功轉賬:" + amount + "元");
}
}
}
//賬戶
static class Account{
int balance;
public Account(int balance) {
this.balance = balance;
}
}
}
複製程式碼
如果註釋掉sleep()
就不會發生死鎖,但是添加了sleep()
在這500ms中就會發生死鎖。
3.3 模擬多人轉賬
/**
* 多人轉賬情況下發生死鎖
*/
public class MultiTransferMoney {
//50個賬戶
private static final int NUM_ACCOUNTS = 50;
//每個賬戶有1000元
private static final int NUM_MONEYS = 1000;
//轉賬次數
private static final int NUM_ITERATIONS = 1000000;
//操作賬戶的人數
private static final int NUM_THREADS = 20;
public static void main(String[] args) {
Random random = new Random();
TransferMoney.Account[] accounts = new TransferMoney.Account[NUM_ACCOUNTS];
for (int i = 0; i < accounts.length; i++) {
accounts[i] = new TransferMoney.Account(NUM_MONEYS);
}
class TransferThread extends Thread {
@Override
public void run() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
int fromAccount = random.nextInt(NUM_ACCOUNTS);
int toAccount = random.nextInt(NUM_ACCOUNTS);
int amount = random.nextInt(NUM_MONEYS);
TransferMoney.transferMoney(accounts[fromAccount],accounts[toAccount],amount);
}
}
}
for (int i = 0; i < NUM_THREADS; i++) {
new TransferThread().start();
}
}
}
複製程式碼
死鎖發生的概率隨著賬戶數量的減少而增加
4.發生死鎖的4個必要條件(缺一不可)
- 互斥:當
thread1
拿到lockA
後,其他執行緒就無法獲取到lockA
。 - 請求與保持:當
thread1
拿到lockA
後,還一定要獲取到lockB
。 - 不可剝奪:在資料庫中可以避免發生死鎖是因為資料庫自身可以剝奪某個事務,這樣就會避免死鎖,但是在Java中不可剝奪。
- 迴圈等待:在兩個執行緒中兩個執行緒相互等待,在多個執行緒中每個執行緒首尾相接形成環路,也就是發生迴圈等待。
5.如何定位死鎖
5.1 ThreadMXBean程式碼演示
/**
* 用ThreadMXBean檢測死鎖
*/
public class ThreadMXBeanDetection implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
@Override
public void run() {
if (flag == 1){
synchronized (o1){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2){
System.out.println("t1持有o2");
}
}
}
if (flag == 0){
synchronized (o2){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1){
System.out.println("t2持有o1");
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ThreadMXBeanDetection d1 = new ThreadMXBeanDetection();
ThreadMXBeanDetection d2 = new ThreadMXBeanDetection();
d1.flag = 1;
d2.flag = 0;
Thread t1 = new Thread(d1);
Thread t2 = new Thread(d2);
t1.start();
t2.start();
Thread.sleep(1000);
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null && deadlockedThreads.length > 0){ //發現了死鎖
for (int i = 0; i < deadlockedThreads.length; i++) {
//通過死鎖的執行緒ID獲取執行緒資訊
ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
System.out.println("發現死鎖" + threadInfo.getThreadName());
}
}
}
}
複製程式碼
發現死鎖後可以通過報警或者日誌的方式對死鎖進行修復。
6.如何修復死鎖
6.1 線上發生死鎖怎麼辦
對於線上問題一定要防患於未然,因為線上上想要沒有任何損失的修復死鎖幾乎是不可能的了。所以需要
- 先將“案發現場”儲存下來然後立刻重啟伺服器。
- 暫時保證線上服務的安全,然後利用剛才留下的資訊立刻定位死鎖,進行修復,然後重新發版。
6.2 修復死鎖的策略
- 避免策略:哲學家就餐的換手方案、轉賬換序方案 思路:避免相反的獲取鎖的順序
- 檢測與恢復策略:一段時間內檢查是否發生死鎖,如果發生死鎖,對資源進行剝奪,從而修復死鎖。
- 鴕鳥策略:鴕鳥這種動物在遇到危險時會把頭埋在地上,這樣就看不到危險了。這也就是說在發生可能性低的時候可以暫時忽略掉死鎖,等到發生後進行人工修復。
6.2.1 避免策略的使用
/**
* 轉賬的時候遇到了死鎖,一旦開啟註釋,便會發生死鎖
*/
public class TransferMoney implements Runnable {
int flag = 1;
//a和b沒人都有500
static Account a = new Account(500);
static Account b = new Account(500);
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
TransferMoney r1 = new TransferMoney();
TransferMoney r2 = new TransferMoney();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a的餘額為:" + a.balance);
System.out.println("b的餘額為:" + b.balance);
}
@Override
public void run() {
if (flag == 1){
//a向b轉200
transferMoney(a,200);
}
if (flag == 0){
//b向a轉200
transferMoney(b,int amount){
class Helper{
public void transfer(){
if (from.balance - amount < 0){
System.out.println("轉賬失敗,餘額不足");
}
from.balance -= amount;
to.balance += amount;
System.out.println("成功轉賬:" + amount + "元");
}
}
//獲取轉入和轉出的hash值
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash < toHash){ //通過hash值保證了獲取鎖的順序
synchronized (from) {
synchronized (to) {
new Helper().transfer();
}
}
} else if (fromHash > toHash){
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
} else { //fromHash == toHash
synchronized (lock){ //誰先拿到lock誰就先執行
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
}
}
}
//賬戶
static class Account{
int balance;
public Account(int balance) {
this.balance = balance;
}
}
}
複製程式碼
通過修改transfer方法計算轉出和轉入的hash值,通過hash值作比較來設定鎖的獲取順序,這樣可以避免死鎖的發生。
7.哲學家就餐問題
7.1 什麼是哲學家就餐問題
每個哲學家吃飯需要先拿起左手(或右手的叉子)再拿起右手(或左手)的叉子才可以吃飯,等待自己用完了放回原處,叉子再供另外的人使用(暫時不考慮衛生問題:))。
死鎖:如果每個人都同時拿起了左邊的叉子,這樣就無法拿到右手邊的叉子,這樣就造成了等待的問題
/**
* 描述: 演示哲學家就餐問題導致的死鎖
*/
public class DiningPhilosophers {
public static class Philosophers implements Runnable {
private Object leftChopstick;
private Object rightChopstick;
public Philosophers(Object leftChopstick,Object rightChopstick) {
this.leftChopstick = leftChopstick;
this.rightChopstick = rightChopstick;
}
@Override
public void run() {
try {
while (true) {
doAction("Thinking");
synchronized (leftChopstick) {
doAction("Picked up left chopstick");
synchronized (rightChopstick) {
doAction("Picked up right chopstick -eating");
doAction("Put down right chopstick");
}
doAction("Put down left chopstick");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doAction(String action) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " " + action);
Thread.sleep((long) (Math.random() * 10));
}
}
public static void main(String[] args) {
//設定哲學家的人數
Philosophers[] philosophers = new Philosophers[5];
//設定筷子的數量,數量與哲學家人數相同
Object[] chopsticks = new Object[philosophers.length];
for (int i = 0; i < chopsticks.length; i++) {
chopsticks[i] = new Object();
}
for (int i = 0; i < philosophers.length; i++) {
Object leftChopstick = chopsticks[i];
Object rightChopstick = chopsticks[(i+1)%chopsticks.length];
philosophers[i] = new Philosophers(leftChopstick,rightChopstick);
new Thread(philosophers[i],"哲學家"+(i+1)+"號").start();
}
}
}
複製程式碼
導致死鎖的一大特徵就是:每個哲學家的左手都拿著筷子,右手無法獲取筷子。
7.2 解決哲學家死鎖問題的4種辦法
- 服務員檢查(避免策略)
- 改變哲學家拿叉子的順序(避免策略)
- 餐票(將餐票數量設定為人數-1)(避免策略)
- 領導調節(檢測與恢復策略)
7.2.1 實現換手策略
public static void main(String[] args) {
//設定哲學家的人數
Philosophers[] philosophers = new Philosophers[5];
//設定筷子的數量,數量與哲學家人數相同
Object[] chopsticks = new Object[philosophers.length];
for (int i = 0; i < chopsticks.length; i++) {
chopsticks[i] = new Object();
}
for (int i = 0; i < philosophers.length; i++) {
Object leftChopstick = chopsticks[i];
Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
if (i == philosophers.length - 1) { //讓這個哲學家換手,避免形成環路
philosophers[i] = new Philosophers(rightChopstick,leftChopstick);
}else{
philosophers[i] = new Philosophers(leftChopstick,rightChopstick);
}
new Thread(philosophers[i],"哲學家" + (i + 1) + "號").start();
}
}
複製程式碼
7.2.2 死鎖檢測與恢復策略
檢測演演算法:鎖的呼叫鏈路圖
- 允許發生死鎖
- 每次呼叫鎖都記錄
- 定期檢查鎖的呼叫鏈路圖是否形成環路
- 一旦發生死鎖,就用死鎖恢復機制進行恢復
恢復方法1:程式終止
逐個終止執行緒,直到死鎖消除
終止順序:
- 1.優先順序(是前臺互動還是後臺處理)
- 2.已佔用資源,還需要的資源
- 3.已經執行的時間
恢復方法2:資源搶佔
把每個分發出去的鎖收回來
讓執行緒回退幾步,這樣就不用結束整個執行緒,成本比較低,但是這樣可能會造成資源一直被搶佔,造成飢餓
8.避免死鎖的有效手段
8.1 設定超時時間(退一步海闊天空)
Lock的tryLock(long timeout,TimeUnit unit)
synchronized不具備嘗試鎖的能力
/**
* 描述: 用tryLock來避免死鎖
*/
public class TryLockDeadLock implements Runnable {
int flag;
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
@Override
public void run() {
for (int i = 0; i < 100; i++) {
if (flag == 1) {
try {
if (lock1.tryLock(800,TimeUnit.MILLISECONDS)) {
System.out.println("執行緒1獲取到了鎖1");
Thread.sleep(new Random().nextInt(1000));
if (lock2.tryLock(800,TimeUnit.MILLISECONDS)) {
System.out.println("執行緒1獲取到了鎖2");
System.out.println("執行緒1成功獲取到兩把鎖");
lock2.unlock();
lock1.unlock();
break;
} else {
System.out.println("執行緒1嘗試獲取鎖2失敗,已重試");
lock1.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("執行緒1獲取鎖1已失敗,已重試");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (flag == 0) {
try {
if (lock2.tryLock(3000,TimeUnit.MILLISECONDS)) {
System.out.println("執行緒2獲取到了鎖2");
Thread.sleep(new Random().nextInt(1000));
if (lock1.tryLock(3000,TimeUnit.MILLISECONDS)) {
System.out.println("執行緒2獲取到了鎖1");
System.out.println("執行緒2成功獲取到兩把鎖");
lock1.unlock();
lock2.unlock();
break;
} else {
System.out.println("執行緒2嘗試獲取鎖1失敗,已重試");
lock2.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("執行緒2獲取鎖2已失敗,已重試");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
TryLockDeadLock r1 = new TryLockDeadLock();
TryLockDeadLock r2 = new TryLockDeadLock();
r1.flag = 1;
r2.flag = 0;
new Thread(r1).start();
new Thread(r2).start();
}
}
複製程式碼
8.2 多使用併發類而不是自己設計鎖
ConcurrentHashMap、ConcurrentLinkedQueue、AtomicBoolean等
實際使用時java.util.concurrent.atomic十分有用,簡單方便且效率比使用Lock高
多用併發集合少用同步集合,併發集合比同步集合擴充套件性更好
併發場景需要用到map,首先想到用ConcurrentHashMap
8.3 降低使用鎖的粒度,避免使用同一個鎖
保護的範圍大,效率低,容易發生死鎖
8.4 儘量使用同步程式碼塊而非同步方法
使用同步程式碼塊相對於同步方法,縮小了保護的範圍,增加了對物件的控制權,降低發生死鎖的風險。
8.5 給執行緒起個有意義的名字
8.6 避免鎖的巢狀
例如上面的MustDeadLock類
8.7 分配資源前先看能不能收回來
例如:銀行家演演算法
8.8 專鎖專用
儘量不要多個功能使用同一把鎖
9.其他活性問題
死鎖是最常見的活躍性問題,不過除了剛才的死鎖之外,還有一些類似的問題,會導致程式無法順利執行,統稱為活躍性問題。
9.1 活鎖
9.1.1 什麼是活鎖
再回到前面的哲學家就餐問題,發生死鎖是因為每個哲學家都是先拿到左手的餐具,永遠在等待右手邊的餐具(或者相反),這樣就會發生死鎖。
活鎖相對與死鎖更加智慧一點,這些哲學家同時進入餐廳,同時拿起左邊的餐具然後會等待5分鐘,然後放下餐具,再等5分鐘,又同時拿起餐具,這樣也會導致每個哲學家無法吃飯。
換成程式中的話就是說:程式一直在執行,但是屬於無用功,白白浪費資源。
9.1.2 活鎖的出現
/**
* 描述: 演示活鎖問題
*/
public class LiveLock {
static class Spoon{
private Diner onwer;
public Spoon(Diner onwer) {
this.onwer = onwer;
}
public Diner getOnwer() {
return onwer;
}
public void setOnwer(Diner onwer) {
this.onwer = onwer;
}
public synchronized void use(){
System.out.printf("%s has eaten!",onwer.name);
}
}
static class Diner{
private String name;
private boolean isHungry;
public Diner(String name) {
this.name = name;
this.isHungry = true;
}
public void earWith(Spoon spoon,Diner spouse){
while (isHungry){
//自己沒有拿到勺子
if (spoon.onwer != this){
//等待伴侶吃飯
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
//如果伴侶是飢餓的
if (spouse.isHungry){
System.out.println(name + ": 親愛的" + spouse.name +"你先吃吧");
//將勺子給伴侶
spoon.setOnwer(spouse);
continue;
}
//我可以吃飯了
spoon.use();
//吃完了改變hungry的狀態
isHungry = false;
System.out.println(name + ": 我吃完了");
//將勺子給伴侶
spoon.setOnwer(spouse);
}
}
}
public static void main(String[] args) {
Diner husband = new Diner("牛郎");
Diner wife = new Diner("織女");
Spoon spoon = new Spoon(husband);
new Thread(new Runnable() {
@Override
public void run() {
husband.earWith(spoon,wife);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
wife.earWith(spoon,husband);
}
}).start();
}
}
複製程式碼
9.1.3 如何解決活鎖問題
出現活鎖的原因:重試機制不變,訊息佇列始終重試,吃飯始終謙讓
解決: 加入隨機因素
9.1.4 程式碼演示
/**
* 描述: 演示活鎖問題
*/
public class LiveLock {
static class Spoon{
private Diner onwer;
public Spoon(Diner onwer) {
this.onwer = onwer;
}
public Diner getOnwer() {
return onwer;
}
public void setOnwer(Diner onwer) {
this.onwer = onwer;
}
public synchronized void use(){
System.out.printf("%s has eaten!",Diner spouse){
while (isHungry){
//自己沒有拿到勺子
if (spoon.onwer != this){
//等待伴侶吃飯
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
Random random = new Random();
//如果伴侶是飢餓的
if (spouse.isHungry && random.nextInt(10) < 9){ //降低給勺子的機率
System.out.println(name + ": 親愛的" + spouse.name +"你先吃吧");
//將勺子給伴侶
spoon.setOnwer(spouse);
continue;
}
//我可以吃飯了
spoon.use();
//吃完了改變hungry的狀態
isHungry = false;
System.out.println(name + ": 我吃完了");
//將勺子給伴侶
spoon.setOnwer(spouse);
}
}
}
public static void main(String[] args) {
Diner husband = new Diner("牛郎");
Diner wife = new Diner("織女");
Spoon spoon = new Spoon(husband);
new Thread(new Runnable() {
@Override
public void run() {
husband.earWith(spoon,husband);
}
}).start();
}
}
複製程式碼
9.2 飢餓
當執行緒需要某些資源(例如CPU),卻始終得不到
飢餓的原因
- 當某個執行緒的執行優先順序過低,始終得不到CPU資源
- 某個執行緒一直持有鎖,卻從不釋放鎖
- 某程式始終佔用某檔案的寫鎖。
飢餓的危害
飢餓可能會導致響應性變差:例如一個執行緒負責前臺的響應,另一條執行緒負責後臺的資料處理,但是由於前臺執行緒優先順序比較低始終得不到執行,這樣會導致使用者體驗變差。